Flink Quick Start with the SQL Shell

This quick start walks you through the following steps to get you up and running with Confluent Cloud for Apache Flink®️.


You need the following prerequisites to use Confluent Cloud for Apache Flink.

  • Access to Confluent Cloud.

  • The organization ID, environment ID, and compute pool ID for your organization.

  • The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.

  • The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:

    confluent update --yes

    If you used homebrew to install the Confluent CLI, update the CLI by using the brew upgrade command, instead of confluent update.

    For more information, see Confluent CLI.

Step 1: Log in to Confluent Cloud with the Confluent CLI

Run the following CLI command to log in to Confluent Cloud.

confluent login --save --organization-id ${ORG_ID}

Your output should resemble:

Assuming https protocol.
Logged in as "<your-email>" for organization "<your-org-id>" ("<your-org-name>").

Step 3: Submit a SQL statement

In the SQL shell, run the following statement to see Flink SQL in action. The CURRENT_TIMESTAMP function returns the local date and time.


Your output should resemble:

Statement name: ab12345c-6e11-7bcd-9
Statement successfully submitted.
Fetching results...
| 2023-07-05 18:57:53.867 |

For all functions and statements supported by Flink SQL, see Flink SQL Reference.

Step 4: Create and populate a table

The following steps show how to create a table, populate it with a few records, and query it to view the records it contains.

  1. Run the following statement to create a table that contains pseudorandom integers.

    CREATE TABLE random_float_table(
      ts TIMESTAMP_LTZ(3),
      random_value FLOAT);
  2. Run the following INSERT VALUES statement to populate random_int_table with records that have a timestamp field and a float field. timestamp values are generated by the CURRENT_TIMESTAMP function, and float values are generated by the RAND_INTEGER(INT) function multiplied by a float.

    INSERT INTO random_float_table VALUES

    Press ENTER to return to the SQL shell. Because INSERT INTO VALUES is a point-in-time statement, it exits after it completes inserting records.

  3. Run the following statement to query random_float_table for all of its records.

    SELECT * FROM random_float_table;

    Your output should resemble:

    ts                      random_value
    2023-09-07 20:24:19.366 0.46
    2023-09-07 20:24:19.276 28.75
    2023-09-07 20:24:19.367 1467.2
    2023-09-07 20:24:19.368 7953.88
    2023-09-07 20:24:19.465 685883.1

    Press Q to exit the results view and stop the statement.

  4. Run the SHOW JOBS statement to get the status of statements in your SQL environment.


    Your output should resemble:

    Statement name: dbdb79f8-7e6e-4b03
    Statement successfully submitted.
    Waiting for statement to be ready. Statement phase is PENDING.
    Statement phase is COMPLETED.
    |        Name        |   Phase   |           Statement            | Compute Pool |  Creation Time   |
    | f8f118e1-bd79-40c1 | COMPLETED | CREATE TABLE random_float_t... | lfcp-xxxxxx  | 2023-09-07 20... |
    | a30f8a59-af67-4bf6 | COMPLETED | INSERT INTO random_float_ta... | lfcp-xxxxxx  | 2023-09-07 20... |

Congratulations, you have run your first Flink SQL statements on Confluent Cloud using the SQL Shell.

Next steps