Confluent CLI commands for Flink

Manage Flink SQL statements and compute pools in Confluent Cloud for Apache Flink®️ by using the confluent flink statement commands in the Confluent CLI. To see the available commands, use the --help option.

confluent flink statement --help
confluent flink compute-pool --help
confluent flink region --help

Use the Confluent CLI to manage these features:

For the complete CLI reference, see confluent flink statement.

In addition to the CLI, you can manage Flink statements and compute pools by using these Confluent tools:

Manage statements

Using the Confluent CLI, you can perform these actions:

Managing Flink SQL statements may require the following inputs, depending on the command:

export STATEMENT_NAME="<statement-name>" # example: "user-filter"
export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export CLUSTER_ID="<kafka-cluster-id>" # example: "lkc-a1b2c3"
export PRINCIPAL_ID="<principal-id>" # example: "sa-23kgz4" for a service account, or "u-aq1dr2" for a user account
export SQL_CODE="<sql-statement-text>" # example: "SELECT * FROM USERS;"

For the complete CLI reference, see confluent flink statement.

Submit a statement

The confluent flink statement create command submits a statement in your compute pool.

Run the following command to submit a Flink SQL statement in the current compute pool with your user account.

confluent flink statement create --sql "${SQL_CODE}"

Your output should resemble:

+---------------+------------------------------------------------------------+
| Creation Date | 2024-02-28 21:08:08.9749 +0000                             |
|               | UTC                                                        |
| Name          | cli-2024-02-28-130806-78dd77b5-16a9-40ab-9786-db95b9895eaa |
| Statement     | Select 1;                                                  |
| Compute Pool  | lfcp-8m09g0                                                |
| Status        | PENDING                                                    |
+---------------+------------------------------------------------------------+

For long-running statements, Confluent recommends submitting statements with a service account instead of your user account.

The following command submits a Flink SQL statement for the specified principal in the specified compute pool and Flink database (Kafka cluster).

confluent flink statement create ${STATEMENT_NAME} \
  --service-account ${PRINCIPAL_ID} \
  --sql "${SQL_CODE}" \
  --compute-pool ${COMPUTE_POOL_ID} \
  --database ${CLUSTER_ID}

List statements

Run the confluent flink statement list command to list all of the non-deleted statements in your environment.

confluent flink statement list

Your output should resemble:

          Creation Date         |         Name         |           Statement            | Compute Pool |  Status   |         Status Detail
--------------------------------+----------------------+--------------------------------+--------------+-----------+---------------------------------
  2023-07-08 21:04:06 +0000 UTC | 4b1d3494-f0f7-460d-9 | INSERT INTO copytopic          | lfcp-r2j1x9  | RUNNING   |
                                |                      | SELECT symbol,price from       |              |           |
                                |                      | topic_datagen;                 |              |           |
  2023-07-08 21:07:04 +0000 UTC | 6c43b973-b3c6-4be8-9 | INSERT INTO copytopic          | lfcp-r2j1x9  | RUNNING   |
                                |                      | SELECT symbol,price from       |              |           |
                                |                      | topic_datagen;                 |              |           |
...

To list only the statements that you’ve created, get the context for your current Confluent Cloud login session and provide the context with the --context option.

confluent context list

Your output should resemble:

  Current |                          Name                          |    Platform     |            Credential
----------+--------------------------------------------------------+-----------------+------------------------------------
  *       |   login-<your-email-address>-https://confluent.cloud   | confluent.cloud | username-<your-email-address>

For convenience, save the context in an environment variable:

export MY_CONTEXT="login-<your-email-address>-https://confluent.cloud"

Run the confluent flink statement list command with the --context option.

confluent flink statement list ${MY_CONTEXT}

Your output should resemble:

          Creation Date          |                            Name                            | Statement | Compute Pool |  Status   | Status Detail
---------------------------------+------------------------------------------------------------+-----------+--------------+-----------+----------------
  2024-02-28 21:08:08.9749 +0000 | cli-2024-02-28-130806-78dd77b5-16a9-40ab-9786-db95b9895eaa | Select 1; | lfcp-8m09g0  | COMPLETED |
  UTC                            |                                                            |           |              |           |
...

To list only the statements in your compute pool, provide the compute pool ID with the --compute-pool option.

confluent flink statement list --compute-pool ${COMPUTE_POOL_ID}

Describe a statement

Run the confluent flink statement describe command to view the details of an existing statement.

confluent flink statement describe ${STATEMENT_NAME}

Your output should resemble:

          Creation Date         |        Name        | Statement  | Compute Pool |  Status   | Status Detail
--------------------------------+--------------------+------------+--------------+-----------+----------------
  2023-07-19 19:26:52 +0000 UTC | fdc6cbf5-038a-408c | show jobs; | lfcp-a1b2c3  | COMPLETED |

List exceptions from a statement

Run the confluent flink statement exception list command to get exceptions that have been thrown by a statement.

confluent flink statement exception list ${STATEMENT_NAME}

Delete a statement

Run the confluent flink statement delete command to delete an existing statement permanently.

  • All of its resources, like checkpoints, are also deleted.
  • Deleting a statement stops charges for its use.
confluent flink statement delete ${STATEMENT_NAME}

Your output should resemble:

Deleted Flink SQL statement "ac23db14-b5dc-49fb-b".

Manage compute pools

Using the Confluent CLI, you can perform these actions:

You must be authorized to create, update, delete (FlinkAdmin) or use (FlinkDeveloper) a compute pool. For more information, see Grant Role-Based Access in Flink SQL.

Managing compute pools may require the following inputs, depending on the command:

export COMPUTE_POOL_NAME=<compute-pool-name> # human-readable name, for example, "my-compute-pool"
export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
export MAX_CFU="<max-cfu>" # example: 5

For the complete CLI reference, see confluent flink compute-pool.

Create a compute pool

Run the confluent flink compute-pool create command to create a compute pool.

Creating a compute pool requires the following inputs:

export COMPUTE_POOL_NAME=<compute-pool-name> # human-readable name, for example, "my-compute-pool"
export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
export MAX_CFU="<max-cfu>" # example: 5

Run the following command to create a compute pool in the specified cloud provider and environment.

confluent flink compute-pool create  ${COMPUTE_POOL_NAME} \
  --cloud ${CLOUD_PROVIDER} \
  --region ${CLOUD_REGION} \
  --max-cfu ${MAX_CFU} \
  --environment ${ENV_ID}

Your output should resemble:

+-------------+-----------------+
| Current     | false           |
| ID          | lfcp-xxd6og     |
| Name        | my-compute-pool |
| Environment | env-z3y2x1      |
| Current CFU | 0               |
| Max CFU     | 5               |
| Cloud       | AWS             |
| Region      | us-east-1       |
| Status      | PROVISIONING    |
+-------------+-----------------+

Describe a compute pool

Run the confluent flink compute-pool describe command to get details about a compute pool.

Describing a compute pool requires the following inputs:

export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"

Run the following command to get details about a compute pool in the specified environment.

confluent flink compute-pool describe ${COMPUTE_POOL_ID} \
  --environment ${ENV_ID}

Your output should resemble:

+-------------+-----------------+
| Current     | false           |
| ID          | lfcp-xxd6og     |
| Name        | my-compute-pool |
| Environment | env-z3y2x1      |
| Current CFU | 0               |
| Max CFU     | 5               |
| Cloud       | AWS             |
| Region      | us-east-1       |
| Status      | PROVISIONED     |
+-------------+-----------------+

List compute pools

Run the confluent flink compute-pool list command to compute pools in the specified environment.

Listing compute pools may require the following inputs, depending on the command:

export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"

Run the following command to get details about a compute pool in the specified environment.

confluent flink compute-pool list --environment ${ENV_ID}

Your output should resemble:

  Current |     ID      |           Name            | Environment | Current CFU | Max CFU | Cloud |  Region   |   Status
----------+-------------+---------------------------+-------------+-------------+---------+-------+-----------+--------------
  *       | lfcp-xxd6og | my-compute-pool           | env-z3y2x1  |           0 |       5 | AWS   | us-east-1 | PROVISIONED
          | lfcp-8m03rm | test-blue-compute-pool    | env-z3q9rd  |           0 |      10 | AWS   | us-east-1 | PROVISIONED
...

Update a compute pool

Run the confluent flink compute-pool update command to update a compute pool. You can update the name of the compute pool and its MAX_CFU configuration.

Updating a compute pool may require the following inputs, depending on the command:

export COMPUTE_POOL_NAME=<compute-pool-name> # human-readable name, for example, "my-compute-pool"
export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
export MAX_CFU="<max-cfu>" # example: 5

Run the following command to get details about a compute pool in the specified environment.

confluent flink compute-pool update ${COMPUTE_POOL_ID} \
  --environment ${ENV_ID} \
  --name ${COMPUTE_POOL_NAME} \
  --max-cfu ${MAX_CFU}

Your output should resemble:

+-------------+----------------------+
| Current     | false                |
| ID          | lfcp-xxd6og          |
| Name        | renamed-compute-pool |
| Environment | env-z3y2x1           |
| Current CFU | 0                    |
| Max CFU     | 10                   |
| Cloud       | AWS                  |
| Region      | us-east-1            |
| Status      | PROVISIONED          |
+-------------+----------------------+

Set the current compute pool

Run the confluent flink compute-pool use command to use a compute pool in subsequent commands.

Setting a compute pool requires the following inputs:

export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"

Run the following commands to set the current compute pool in the specified environment. First, you must run the confluent environment use command to set the current environment.

confluent environment use ${ENV_ID} && \
confluent flink compute-pool use ${COMPUTE_POOL_ID}

Your output should resemble:

Using environment "env-z3y2x1".
Using Flink compute pool "lfcp-xxd6og".

Unset the current compute pool

Run the confluent flink compute-pool unset command to unset the current compute pool.

Run the following command to unset the current compute pool.

confluent flink compute-pool unset

Your output should resemble:

Unset Flink compute pool "lfcp-xxd6og".

Delete a compute pool

Run the confluent flink compute-pool delete command to delete a compute pool.

Run the following command to delete a compute pool in the specified environment. The optional --force flag skips the confirmation prompt.

confluent flink compute-pool delete ${COMPUTE_POOL_ID} \
  --environment ${ENV_ID}
  --force

Your output should resemble:

Deleted Flink compute pool "lfcp-xxd6og".

Manage regions

Using the Confluent CLI, you can perform these actions:

Managing Flink SQL regions may require the following inputs, depending on the command:

export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"

For the complete CLI reference, see confluent flink region.

List available regions

Run the confluent flink region list to see all available regions where you can run Flink statements.

confluent flink region list

Your output should resemble:

  Current |             Name              | Cloud |        Region
----------+-------------------------------+-------+-----------------------
          | Belgium (europe-west1)        | gcp   | europe-west1
          | Frankfurt (eu-central-1)      | aws   | eu-central-1
          | Frankfurt (europe-west3)      | gcp   | europe-west3
          | Iowa (us-central1)            | gcp   | us-central1
          | Ireland (eu-west-1)           | aws   | eu-west-1
          | Las Vegas (us-west4)          | gcp   | us-west4
          | London (eu-west-2)            | aws   | eu-west-2
  *       | N. Virginia (us-east-1)       | aws   | us-east-1
          | N. Virginia (us-east4)        | gcp   | us-east4
          | Netherlands (westeurope)      | azure | westeurope
          | Ohio (us-east-2)              | aws   | us-east-2
          | Oregon (us-west-2)            | aws   | us-west-2
          | S. Carolina (us-east1)        | gcp   | us-east1
          | Singapore (ap-southeast-1)    | aws   | ap-southeast-1
          | Singapore (asia-southeast1)   | gcp   | asia-southeast1
          | Singapore (southeastasia)     | azure | southeastasia
          | Sydney (ap-southeast-2)       | aws   | ap-southeast-2
          | Sydney (australia-southeast1) | gcp   | australia-southeast1
          | Virginia (eastus)             | azure | eastus
          | Virginia (eastus2)            | azure | eastus2
          | Washington (westus2)          | azure | westus2

Run the following command to filter the list of available regions by cloud provider.

confluent flink region list --cloud ${CLOUD_PROVIDER}

Your output should resemble:

  Current |            Name            | Cloud |     Region
----------+----------------------------+-------+-----------------
          | Frankfurt (eu-central-1)   | aws   | eu-central-1
          | Ireland (eu-west-1)        | aws   | eu-west-1
          | London (eu-west-2)         | aws   | eu-west-2
  *       | N. Virginia (us-east-1)    | aws   | us-east-1
          | Ohio (us-east-2)           | aws   | us-east-2
          | Oregon (us-west-2)         | aws   | us-west-2
          | Singapore (ap-southeast-1) | aws   | ap-southeast-1
          | Sydney (ap-southeast-2)    | aws   | ap-southeast-2

Set the current region

Run the confluent flink region use to set the current region where subsequent Flink statements run. You must have a compute pool in the region to run statements.

confluent flink region use --cloud ${CLOUD_PROVIDER} --region ${CLOUD_REGION}

For CLOUD_PROVIDER=aws and CLOUD_REGION=us-east-2, your output should resemble:

Using Flink region "Ohio (us-east-2)".