Flink SQL Quick Start with the SQL Shell¶
Important
Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For Flink SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
This quick start walks you through the following steps to get you up and running with Confluent Cloud for Apache Flink®️.
- Step 1: Log in to Confluent Cloud with the Confluent CLI
- Step 2: Start the Flink SQL shell
- Step 3: Submit a SQL statement
- Step 4: Create and populate a table
Prerequisites¶
You need the following prerequisites to use Confluent Cloud for Apache Flink®️.
Access to Confluent Cloud.
The organization ID, environment ID, and compute pool ID for your organization at hand.
The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDevelper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:
confluent update --yes
If you used homebrew to install the Confluent CLI, update the CLI by using the
brew upgrade
command, instead ofconfluent update
.For more information, see Confluent CLI.
Step 1: Log in to Confluent Cloud with the Confluent CLI¶
Run the following CLI command to log in to Confluent Cloud.
confluent login --save --organization-id ${ORG_ID}
Your output should resemble:
Assuming https protocol.
Logged in as "<your-email>" for organization "<your-org-id>" ("<your-org-name>").
Step 2: Start the Flink SQL shell¶
Start the Flink SQL shell by running the confluent flink shell
command.
The shell connects with Confluent Cloud
Important
This guide focuses on ad-hoc statments.
To run statements in long-running jobs, provide the --service-account
option in the confluent flink shell
command. When you start the shell
without this option, statements run with your user account, and the system
terminates them automatically after four hours. For more information, see
Service Accounts for Confluent Cloud.
Run the following CLI command to start the Flink SQL shell.
confluent flink shell --compute-pool ${COMPUTE_POOL_ID} --environment ${ENV_ID}
Your output should resemble:
Warning: no service account provided. To ensure that your statements run continuously, switch to using a service account instead of your user identity with `confluent iam service-account use` or `--service-account`. Otherwise, statements will stop running after 4 hours.
Welcome!
To exit, press Ctrl-Q or type "exit".
[Ctrl-Q] Quit [Ctrl-S] Toggle Smart Completion
>
You’re ready to start processing data by submitting statements to Flink SQL.
Step 3: Submit a SQL statement¶
In the SQL shell, run the following statement to see Flink SQL in action. The CURRENT_TIMESTAMP function returns the local date and time.
SELECT CURRENT_TIMESTAMP;
Your output should resemble:
Statement name: ab12345c-6e11-7bcd-9
Statement successfully submitted.
Fetching results...
+-------------------------+
| CURRENT_TIMESTAMP |
+-------------------------+
| 2023-07-05 18:57:53.867 |
+-------------------------+
For all functions and statements supported by Flink SQL, see Flink SQL Reference.
Step 4: Create and populate a table¶
The following steps show how to create a table, populate it with a few records, and query it to view the records it contains.
Run the following statement to create a table that contains pseudorandom integers.
CREATE TABLE random_float_table( ts TIMESTAMP_LTZ(3), random_value FLOAT);
Run the following INSERT VALUES statement to populate
random_int_table
with records that have a timestamp field and afloat
field. timestamp values are generated by the CURRENT_TIMESTAMP function, and float values are generated by the RAND_INTEGER(INT) function multiplied by a float.INSERT INTO random_float_table VALUES (CURRENT_TIMESTAMP, RAND_INTEGER(100)*0.02), (CURRENT_TIMESTAMP, RAND_INTEGER(1000)*0.05), (CURRENT_TIMESTAMP, RAND_INTEGER(10000)*0.20), (CURRENT_TIMESTAMP, RAND_INTEGER(100000)*0.22), (CURRENT_TIMESTAMP, RAND_INTEGER(1000000)*0.7);
Press ENTER to return to the SQL shell. Because INSERT INTO VALUES is a point-in-time statement, it exits after it completes inserting records.
Run the following statement to query
random_float_table
for all of its records.SELECT * FROM random_float_table;
Your output should resemble:
ts random_value 2023-09-07 20:24:19.366 0.46 2023-09-07 20:24:19.276 28.75 2023-09-07 20:24:19.367 1467.2 2023-09-07 20:24:19.368 7953.88 2023-09-07 20:24:19.465 685883.1
Press Q to exit the results view and stop the statement.
Run the SHOW JOBS statement to get the status of statements in your SQL environment.
SHOW JOBS;
Your output should resemble:
Statement name: dbdb79f8-7e6e-4b03 Statement successfully submitted. Waiting for statement to be ready. Statement phase is PENDING. Statement phase is COMPLETED. +--------------------+-----------+--------------------------------+--------------+------------------+ | Name | Phase | Statement | Compute Pool | Creation Time | +--------------------+-----------+--------------------------------+--------------+------------------+ | f8f118e1-bd79-40c1 | COMPLETED | CREATE TABLE random_float_t... | lfcp-xxxxxx | 2023-09-07 20... | | a30f8a59-af67-4bf6 | COMPLETED | INSERT INTO random_float_ta... | lfcp-xxxxxx | 2023-09-07 20... | +--------------------+-----------+--------------------------------+--------------+------------------+
Congratulations, you have run your first Flink SQL statements on Confluent Cloud using the SQL Shell.