Manage Kubernetes Clusters in Confluent Manager for Apache Flink

Confluent Manager for Apache Flink® (CMF) supports managing Flink workloads across multiple Kubernetes clusters. A single CMF instance can register and manage additional clusters, deploy environments and applications on them, and monitor their health, all through the same REST API.

CMF represents every Kubernetes cluster it manages as a KubernetesCluster resource, including the cluster where CMF itself is installed (the hub cluster). By default, CMF deploys Flink resources on this hub cluster. Multi-cluster support allows additional spoke clusters to be registered. Once registered, environments can be bound to any cluster, and Flink applications, compute pools, and SQL statements can be deployed on that cluster.

This topic describes how to register, monitor, and decommission Kubernetes clusters in CMF.

Note

Kubernetes cluster management is currently supported through the REST API only.

Prerequisites

Before registering a spoke cluster:

  1. Install the Flink Kubernetes Operator (FKO) on the spoke cluster. The FKO and its Custom Resource Definitions (CRDs) (FlinkDeployment, FlinkSessionJob, FlinkStateSnapshot) must be present before the cluster is added to CMF. Registering a cluster before FKO is installed leads to an inconsistent state where the cluster appears connected but workload operations are not monitored correctly. To recover, install FKO on the cluster and then decommission and recommission the cluster via the API.

    CMF validates the FKO installation when a cluster is first registered and again whenever it reconnects after a DISCONNECTED transition. This validation is advisory — CMF logs a warning but does not block registration if the check fails.

  2. Place a kubeconfig file for the cluster in CMF‘s kubeconfig directory (/etc/cmf/kubeconfig by default, configurable with cmf.k8s.kubeconfigDirectory). The filename must exactly match the cluster name used during registration. For example, a file named my-cluster registers the cluster under the name my-cluster.

  3. Ensure network and RBAC access. CMF must be able to reach the spoke cluster’s API server, and the kubeconfig must grant CMF sufficient permissions to connect to the API server and monitor Flink resources on the cluster.

Kubeconfig management

CMF watches the kubeconfig directory for file changes at runtime, so clusters can be registered and deregistered without restarting CMF. Adding a kubeconfig file registers the cluster, removing it deregisters the cluster, and replacing a file (for example, for credential rotation) re-initializes the connection to that cluster automatically.

Note

The maximum kubeconfig file size is 10 MB.

For more details, see the configuration reference.

Register a cluster

CMF registers a cluster automatically when you place a kubeconfig file in the kubeconfig directory (/etc/cmf/kubeconfig by default, see Configuration reference). CMF derives the cluster name from the filename, creates a KubernetesCluster record in its database, and immediately begins health monitoring.

CMF always registers the cluster where it is installed under the reserved name default-k8s-cluster. This cluster does not require a kubeconfig file. The name default-k8s-cluster is reserved and must not be used for any other cluster.

Note

Clusters cannot be manually created via the REST API. Registration is driven entirely by kubeconfig files in the watched directory.

List and view clusters

Use the /cmf/api/v1/kubernetes-clusters endpoint to view the state of all registered clusters.

List all clusters:

curl http://cmf:8080/cmf/api/v1/kubernetes-clusters

Get a specific cluster:

curl http://cmf:8080/cmf/api/v1/kubernetes-clusters/my-cluster

Example response:

{
  "apiVersion": "cmf.confluent.io/v1",
  "kind": "KubernetesClustersPage",
  "items": [
    {
      "apiVersion": "cmf.confluent.io/v1",
      "kind": "KubernetesCluster",
      "metadata": {
        "name": "default-k8s-cluster",
        "uid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
        "creationTimestamp": "2025-01-10T08:00:00Z",
        "updateTimestamp": "2025-04-01T12:00:00Z"
      },
      "spec": {
        "lifecycleState": "ACTIVE"
      },
      "status": {
        "state": "CONNECTED",
        "message": "Cluster is connected",
        "lastHeartbeatTimestamp": "2025-04-10T10:55:00Z",
        "kubernetesVersion": "v1.29.2"
      }
    },
    {
      "apiVersion": "cmf.confluent.io/v1",
      "kind": "KubernetesCluster",
      "metadata": {
        "name": "spoke-us-west",
        "uid": "b2c3d4e5-f6a7-8901-bcde-f12345678901",
        "creationTimestamp": "2025-03-01T09:00:00Z",
        "updateTimestamp": "2025-04-01T12:00:00Z"
      },
      "spec": {
        "lifecycleState": "ACTIVE"
      },
      "status": {
        "state": "CONNECTED",
        "message": "Cluster is connected",
        "lastHeartbeatTimestamp": "2025-04-10T10:55:00Z",
        "kubernetesVersion": "v1.30.1"
      }
    }
  ]
}

The response includes the following status fields:

Field

Description

status.state

CONNECTED, DISCONNECTED, or DECOMMISSIONED

status.message

Human-readable description of the current state or last error

status.lastHeartbeatTimestamp

Timestamp of the last successful heartbeat

status.kubernetesVersion

Kubernetes server version (only populated when CONNECTED)

Cluster states

Each cluster is in one of three states:

State

Meaning

Write operations

Read operations

CONNECTED

Cluster is reachable

Allowed

Live data

DISCONNECTED

Cluster is unreachable

Rejected (HTTP 503)

Stale data with warning

DECOMMISSIONED

Cluster is permanently retired

Rejected

Stale data with warning

When a cluster transitions to DISCONNECTED:

  • All mutating operations (create, update, delete, start, suspend) on resources in environments bound to that cluster return HTTP 503 with a message identifying the cluster and its state.

  • Read (GET and list) operations return the last known data, with a status.warning field indicating that data may be stale.

Recovery is fully automatic. When CMF’s heartbeat probe succeeds on a DISCONNECTED cluster, CMF transitions the cluster back to CONNECTED and resumes normal operation. No manual intervention is required.

CMF retries on the next heartbeat cycle. If a cluster appears stuck in DISCONNECTED even after the Kubernetes API becomes reachable, it resolves automatically without manual intervention.

Decommission a cluster

To permanently retire a cluster, set its lifecycleState to DECOMMISSIONED:

curl -X PUT http://cmf:8080/cmf/api/v1/kubernetes-clusters/my-cluster \
  -H "Content-Type: application/json" \
  -d '{
    "apiVersion": "cmf.confluent.io/v1",
    "kind": "KubernetesCluster",
    "metadata": { "name": "my-cluster" },
    "spec": { "lifecycleState": "DECOMMISSIONED" }
  }'

A cluster cannot be decommissioned while environments are bound to it. Delete all environments bound to the cluster first.

Once decommissioned, the heartbeat stops monitoring the cluster. To recommission it, set lifecycleState back to ACTIVE:

curl -X PUT http://cmf:8080/cmf/api/v1/kubernetes-clusters/my-cluster \
  -H "Content-Type: application/json" \
  -d '{
    "apiVersion": "cmf.confluent.io/v1",
    "kind": "KubernetesCluster",
    "metadata": { "name": "my-cluster" },
    "spec": { "lifecycleState": "ACTIVE" }
  }'

Note

Labels and annotations on a KubernetesCluster cannot be cleared to an empty set via the update API. Sending labels: {} or omitting labels from an update request preserves the existing labels unchanged. To partially remove labels, send a non-empty replacement set containing only the labels you want to keep.

Force-delete resources from a disconnected cluster

When a cluster is DISCONNECTED or DECOMMISSIONED, normal deletion fails because CMF cannot reach the cluster. Use ?force=true to remove a resource from CMF’s database. This may leave orphaned resources on the Kubernetes cluster that will need to be cleaned up manually when the cluster becomes reachable again:

# Force-delete an application on a disconnected cluster
curl -X DELETE \
  "http://cmf:8080/cmf/api/v1/environments/my-env/applications/my-app?force=true"

# Force-delete a compute pool
curl -X DELETE \
  "http://cmf:8080/cmf/api/v1/environments/my-env/compute-pools/my-pool?force=true"

# Force-delete a SQL statement
curl -X DELETE \
  "http://cmf:8080/cmf/api/v1/environments/my-env/statements/my-statement?force=true"

# Force-delete an environment (after all resources are removed)
curl -X DELETE \
  "http://cmf:8080/cmf/api/v1/environments/my-env?force=true"

Health monitoring (heartbeat)

CMF probes every registered cluster on a fixed interval to detect connectivity changes:

  • Interval: every 60 seconds (configurable with cmf.k8s.heartbeatIntervalSeconds)

  • Probe: queries the cluster’s Kubernetes API server version

  • Timeout: 10 seconds per cluster (configurable with cmf.k8s.heartbeatTimeoutSeconds)

  • CMF skips DECOMMISSIONED clusters; the heartbeat does not run for them

After three consecutive probe failures, the cluster transitions to DISCONNECTED. On the next successful probe, the cluster transitions back to CONNECTED and normal operation resumes automatically.

Set cmf.k8s.heartbeatIntervalSeconds=0 to disable heartbeat monitoring entirely.

Configuration reference

Property

Default

Description

cmf.k8s.heartbeatIntervalSeconds

60

Interval in seconds between heartbeat rounds. Set to 0 to disable.

cmf.k8s.heartbeatTimeoutSeconds

10

Per-cluster probe timeout in seconds. Must not exceed heartbeatIntervalSeconds.

cmf.k8s.kubeconfigDirectory

/etc/cmf/kubeconfig

Directory watched for spoke cluster kubeconfig files.