Manage Compute Pools in Confluent Manager for Apache Flink

A compute pool in Confluent Manager for Apache Flink® (CMF) is the compute resource used to execute SQL statements. Each statement must reference a compute pool. CMF supports two types of compute pools:

  • A dedicated compute pool starts a new Flink cluster in application mode for each statement. Each statement gets its own isolated cluster with resources defined by the pool’s clusterSpec. Dedicated pools are recommended for long-running production workloads because each statement runs in full isolation with its own resource allocation.

  • A shared compute pool backs a single long-lived Flink session cluster. Multiple statements execute on the shared cluster simultaneously. Because the cluster is already running when a statement is submitted, statements start and return results much faster than on dedicated pools. Shared pools are best suited for interactive queries and short-lived workloads where low startup latency matters more than per-statement isolation.

An environment can have multiple compute pools. Each statement is associated with exactly one compute pool.

Both pool types use a clusterSpec that defines the Flink cluster resources and configuration. For dedicated pools, CMF launches a new cluster matching this spec for each statement submitted to the pool. For shared pools, the cluster is launched once when the pool is created and reused across all statements.

You can specify resources for the JobManager and TaskManagers, Flink configuration passed to those processes at startup, and the Docker image to use.

The following fields must not be set because they are automatically managed by CMF:

  • serviceAccount: CMF uses a dedicated Kubernetes service account to deploy statements.

  • job: most properties of the job spec are automatically set by CMF, including jarURI, entryClass, args, parallelism, and state.

CMF also creates two volumes and volume mounts to pass information from CMF to the Flink cluster that you should not interfere with:

  • volume: statement-plan-volume, mount path: /opt/flink/statement

  • volume: statement-encryption-volume, mount path: /opt/flink/statement-secret

The image field must be set to a confluentinc/cp-flink-sql image (or an image that uses the cp-flink-sql image as base image). CMF expects the image to contain a specific Flink job JAR and certain dependencies to be present. Shared compute pools require at least the cp7 build of this image (for example, confluentinc/cp-flink-sql:1.19-cp7).

Create a compute pool

Set spec.type to DEDICATED or SHARED to control the pool type. For dedicated pools, CMF launches a new Flink cluster for each statement submitted to the pool. For shared pools, CMF launches a single long-lived session cluster when the pool is created; statements run on that cluster. The spec.type field is immutable, you cannot change a pool’s type after creation.

The spec.state field applies only to shared pools and defaults to RUNNING if omitted:

  • RUNNING: CMF deploys the session cluster to Kubernetes when the pool is created. The cluster must be in the RUNNING phase before statements can be submitted.

  • SUSPENDED: CMF stores the pool’s metadata but does not deploy any Kubernetes resources. A pool can be created in the suspended state and activated later.

Following are example compute pool resource definitions.

Dedicated compute pool:

{
  "apiVersion": "cmf.confluent.io/v1",
  "kind": "ComputePool",
  "metadata": {
    "name": "pool"
  },
  "spec": {
    "type": "DEDICATED",
    "clusterSpec": {
      "flinkVersion": "v1_19",
      "image": "confluentinc/cp-flink-sql:1.19-cp7",
      "flinkConfiguration": {
        "execution.checkpointing.interval": "10s"
      },
      "taskManager": {
        "resource": {
          "cpu": 1.0,
          "memory": "1024m"
        }
      },
      "jobManager": {
        "resource": {
          "cpu": 0.5,
          "memory": "1024m"
        }
      }
    }
  }
}

Shared compute pool:

{
  "apiVersion": "cmf.confluent.io/v1",
  "kind": "ComputePool",
  "metadata": {
    "name": "shared-pool"
  },
  "spec": {
    "type": "SHARED",
    "state": "RUNNING",
    "clusterSpec": {
      "flinkVersion": "v1_19",
      "image": "confluentinc/cp-flink-sql:1.19-cp7",
      "flinkConfiguration": {
        "taskmanager.numberOfTaskSlots": "4",
        "execution.checkpointing.interval": "10s"
      },
      "taskManager": {
        "resource": {
          "cpu": 1.0,
          "memory": "2048m"
        }
      },
      "jobManager": {
        "resource": {
          "cpu": 1.0,
          "memory": "1024m"
        }
      }
    }
  }
}
confluent --environment env-1 flink compute-pool create /path/to/compute-pool.json
curl -v -H "Content-Type: application/json" \
 -X POST http://cmf:8080/cmf/api/v1/environments/env-1/compute-pools \
 -d @/path/to/compute-pool.json

Deployment modes

Shared compute pools support two cluster deployment modes via the spec.clusterSpec.mode field, corresponding to Flink’s Kubernetes session cluster deployment modes:

  • Native mode (default): TaskManagers are dynamically allocated and released by Flink’s own ResourceManager as statements are submitted and complete. spec.clusterSpec.taskManager.replicas is ignored in this mode. The benefit of this mode is that the amount of allocated resources is dynamically adjusting to the number of submitted statements, at the cost of potentially slightly longer query submission times.

  • Standalone mode: Set spec.clusterSpec.mode: standalone. The Flink Kubernetes Operator manages a fixed number of TaskManager pods. Set spec.clusterSpec.taskManager.replicas to control the count; if not set, the Operator defaults to one TaskManager pod. The benefit is that TaskManager resources are pre-allocated, leading to faster query submission times, but potential overallocation of resources.

Compute pool status

The status.phase field reflects the current state of a shared compute pool. The following phases apply to shared pools:

Phase

Description

PENDING

The pool is being deployed or transitioning between states.

RUNNING

The session cluster is active and accepting statements.

SUSPENDED

The pool is stopped. No Kubernetes resources exist for the pool’s cluster. Statements cannot be submitted.

FAILING

The session cluster is encountering errors.

FAILED

The session cluster has failed. The pool can be updated or deleted regardless of statement state. Updating the pool can bring it back to a running state.

DELETING

The pool is being deleted.

For details on creating and managing statements on a shared compute pool, see Create Statements with Confluent Manager for Apache Flink. For savepoint management, see Manage Savepoints in Confluent Manager for Apache Flink.

Update a compute pool

You can update a compute pool’s clusterSpec by sending a PUT request with the full resource body.

curl -v -H "Content-Type: application/json" \
 -X PUT http://cmf:8080/cmf/api/v1/environments/env-1/compute-pools/<pool-name> \
 -d @/path/to/compute-pool.json

CLI update is not supported for compute pools.

The spec.type field is immutable. You cannot change a pool from DEDICATED to SHARED or vice versa. Create a new pool if you need a different type.

Statement phases can block updates to a compute pool. The following table shows which statement phases block updates to each pool type:

Pool type

Statement phases that block update

Shared (type: SHARED)

PENDING, RUNNING, STOPPED, FAILING

Dedicated (type:DEDICATED)

PENDING, RUNNING, FAILING, FAILED

For shared pools, COMPLETED and FAILED statements do not block updates. If the shared pool’s phase is FAILED, updates are allowed regardless of statement phase; updating a FAILED pool may cause statements on the pool to transition to new states.

Suspend and resume a shared pool

You can stop the session cluster without deleting the pool’s metadata by setting spec.state: SUSPENDED. When suspended, no Kubernetes resources exist for the cluster. Resuming the pool redeploys the cluster.

To suspend a shared compute pool, send a PUT request with the pool’s full resource body and spec.state: SUSPENDED:

curl -v -H "Content-Type: application/json" \
 -X PUT http://cmf:8080/cmf/api/v1/environments/env-1/compute-pools/shared-pool \
 -d @/path/to/shared-compute-pool-suspended.json

Suspending shared pools is not supported in the CLI.

To resume a suspended pool, send a PUT request with spec.state: RUNNING:

curl -v -H "Content-Type: application/json" \
 -X PUT http://cmf:8080/cmf/api/v1/environments/env-1/compute-pools/shared-pool \
 -d @/path/to/shared-compute-pool-running.json

Resuming shared pools is not supported in the CLI.

The pool transitions through PENDING while the session cluster starts, then reaches RUNNING when the cluster is ready.

Suspension is treated as an update operation and is blocked if any statement on the pool is in the PENDING, RUNNING, STOPPED, or FAILING phase. To clear the blocking state, delete all statements in those phases, or wait for them to reach a terminal state (COMPLETED or FAILED) naturally before suspending the pool. Note that stopping a statement puts it in the STOPPED phase, which also blocks suspension.

Delete a compute pool

A compute pool can only be deleted if no statements on the pool are in the PENDING, RUNNING, STOPPED, or FAILING phase.

curl -v -H "Content-Type: application/json" \
 -X DELETE http://cmf:8080/cmf/api/v1/environments/env-1/compute-pools/<pool-name>
confluent --environment env-1 flink compute-pool delete <pool-name>

For shared pools, if the pool’s phase is FAILED, deletion is allowed regardless of statement phase.

By default, deletion removes both the CMF pool metadata and the associated Kubernetes resources. If the Kubernetes cluster is unavailable or Kubernetes resource cleanup fails, the deletion is blocked. In this case, use the force=true query parameter to remove the pool’s metadata immediately. Note that with force=true, Kubernetes resources may not be cleaned up:

curl -v -H "Content-Type: application/json" \
 -X DELETE "http://cmf:8080/cmf/api/v1/environments/env-1/compute-pools/shared-pool?force=true"

Force deletion is not supported in the CLI.

Note

The force=true parameter bypasses Kubernetes cleanup failures but does not bypass the in-use check. Deletion is still blocked if statements are in the PENDING, RUNNING, STOPPED, or FAILING phase.

Access the Flink web UI

For per-statement web UI access on shared pools, see Manage Statements in Confluent Manager for Apache Flink.

Cluster-level web UI access exposes the entire shared session cluster — all statements and cluster-wide metrics — in a single view. This is only available for shared compute pools and requires the AccessWebUI permission on the compute pool resource (ClusterAdmin, SystemAdmin, or super.user):

GET http://cmf:8080/cmf/api/v1/environments/{env}/compute-pools/{pool-name}/flink-web-ui/{path}

The pool must be in the RUNNING phase. For RBAC details, see Configure Access Control for Confluent Manager for Apache Flink.

Configure high availability

If the session cluster fails, statements running on it lose their state. To enable the cluster to recover and resume statements automatically, you should configure Kubernetes high availability in the pool’s flinkConfiguration:

{
  "spec": {
    "type": "SHARED",
    "state": "RUNNING",
    "clusterSpec": {
      "flinkConfiguration": {
        "high-availability.type": "kubernetes",
        "high-availability.storageDir": "s3://your-bucket/shared-pool/recovery",
        "state.checkpoints.dir": "s3://your-bucket/shared-pool/checkpoints"
      }
    }
  }
}

With Kubernetes high availability enabled, the Flink Kubernetes Operator can restart the JobManager and recover and restart statements from the latest checkpoint after a cluster failure.

Behavior on disconnected or decommissioned clusters

When the Kubernetes cluster that an environment is bound to is DISCONNECTED or DECOMMISSIONED, the behavior of compute pool operations changes:

  • Read operations such as describe and list return the last known data cached in CMF. The response includes a status.warning field indicating that the data may be stale and does not reflect the actual state on Kubernetes.

  • Write operations such as create and delete are rejected with HTTP 503.

To remove a compute pool from CMF’s database when the cluster is unreachable, use ?force=true on the delete request. This removes the resource from CMF without contacting Kubernetes. Delete all statements referencing the compute pool first.

curl -X DELETE \
  "http://cmf:8080/cmf/api/v1/environments/env-1/compute-pools/pool?force=true"

For more information on cluster states and recovery, see Manage Kubernetes Clusters in Confluent Manager for Apache Flink.