Manage Savepoints in Confluent Manager for Apache Flink

You can manage Savepoint resources for both Confluent Manager for Apache Flink® applications and statements with the CMF API, starting with CMF 2.1.0. A Savepoint resource in CMF points to a savepoint in Apache Flink, which is an image of the execution state of a streaming job, created via Flink’s checkpointing mechanism.

The savepoint resource tracks the lifecycle, location, and metadata of a Flink savepoint.

Savepoint states

A CMF savepoint can have the following states in status.state:

TRIGGER_PENDING
The request to trigger the savepoint has been accepted and is waiting to be processed.
IN_PROGRESS
The savepoint operation is currently running.
COMPLETED
Savepoint creation was successful. The final location is available in status.path.
FAILED
Savepoint creation failed. Check status.error for details about the failure. You can check the number of retries in status.failures.
ABANDONED
The savepoint operation was abandoned, potentially due to a job shutdown or other interruption.

Savepoint formats

When triggering a savepoint, you can specify the format in spec.formatType:

CANONICAL
The default, standard Flink savepoint format.
NATIVE
The native Flink savepoint format.

Note

You cannot create a CANONICAL savepoint for a job with an operator that has enableAsyncState() set.

For more information on formats, see the Apache Flink documentation on savepoint formats.

Default values

When you create a Savepoint resource, some fields in the spec have default values if not explicitly provided:

  • spec.formatType: Defaults to CANONICAL if not specified.
  • spec.backoffLimit: Defaults to -1, which means that CMF will retry a failed savepoint operation indefinitely until it succeeds or is manually abandoned.

Attached versus Detached savepoints

CMF manages two kinds of savepoints:

Attached savepoints
These are tied to a specific Flink application or statement. You can start any Flink application or statement from its attached savepoint.
Detached savepoints
These are standalone resources not tied to a specific running job. They are used to import savepoints from external systems (like open-source Flink) or to preserve savepoints from deleted jobs. These are also used for starting a Flink Application from a savepoint created by another Flink application.

Manage savepoints with APIs

There are a number of Savepoint-related operations you can perform using the CMF REST API.

Trigger a new attached savepoint

You can manually trigger a new savepoint for a running Flink application or statement.

For Applications:

POST /cmf/api/v1/environments/{env}/applications/{appName}/savepoints

For Statements:

POST /cmf/api/v1/environments/{env}/statements/{stmtName}/savepoints

Request Body:

{
  "apiVersion": "cmf.confluent.io/v1",
  "kind": "Savepoint",
  "metadata": {
    "name": "my-manual-savepoint-1"
  },
  "spec": {
    "path": "s3://my-bucket/custom-path/",
    "formatType": "CANONICAL",
    "backoffLimit": 0
  }
}

Field Explanations:

  • metadata.name (Optional): A user-provided name for the savepoint. If the name is not provided, CMF will generate one based on the parent resource.
  • spec.path (Optional): The directory where the savepoint data should be stored. If not provided, the Flink cluster’s default configured savepoint directory will be used.
  • spec.formatType (Optional): Specifies the savepoint format. Can be CANONICAL or NATIVE. Defaults to CANONICAL. For more information, see Savepoint Formats.
  • spec.backoffLimit (Optional): The maximum number of retries if the savepoint operation fails. 0 means no retries. -1 means unlimited retries. Defaults to -1. For more information, see Default Values.

Adopt automatically-created snapshots

In addition to savepoints you manually trigger, CMF will automatically adopt any savepoint created as part of a job upgrade or suspension. CMF detects these FKO-created snapshots and automatically creates corresponding CMF Savepoint resources. These savepoints will then appear in the list of attached savepoints for that Flink application or statement, allowing you to manage them just like any other attached savepoint.

Importing a Savepoint (Detached)

To import an existing savepoint (for example, from open-source Flink or another platform), you create a detached savepoint. This operation “registers” the existing savepoint with CMF. The state of a detached savepoint will always be COMPLETED.

POST /cmf/api/v1/detached-savepoints

Request Body:

{
  "apiVersion": "cmf.confluent.io/v1",
  "kind": "DetachedSavepoint",
  "metadata": {
     "name": "my-detached-savepoint-1"
  },
  "spec": {
    "path": "s3://flink/stateful-flink/checkpoints"
  }
}

Field Explanations:

  • metadata.name (Required): The name of the savepoint. This must be globally unique.
  • spec.path (Required): The full, exact path to the completed savepoint data.

Listing and Reading Savepoints

List Attached Savepoints

For Applications:

GET /cmf/api/v1/environments/{env}/applications/{appName}/savepoints

For Statements:

GET /cmf/api/v1/environments/{env}/statements/{stmtName}/savepoints

List Detached Savepoints

GET /cmf/api/v1/detached-savepoints

Read a Single Savepoint

For Applications:

GET /cmf/api/v1/environments/{env}/applications/{appName}/savepoints/{savepointName}

For Detached Savepoints:

GET /cmf/api/v1/detached-savepoints/{savepointName}

Detach a savepoint

You can also convert an attached savepoint into a detached savepoint. This is useful for preserving a savepoint before deleting the parent application or for sharing it with other jobs. This can be done only for savepoints in the COMPLETED state.

For Applications only:

POST /cmf/api/v1/environments/{env}/applications/{appName}/savepoints/{savepointName}/detach

Note

Detaching is only supported for FlinkApplications. Savepoints attached to a Statement cannot be detached.

Delete a savepoint

The deletion behavior differs significantly between attached and detached savepoints.

Delete an attached savepoint

DELETE /cmf/api/v1/environments/{env}/applications/{appName}/savepoints/{savepointName}?force=(true|false)
DELETE /cmf/api/v1/environments/{env}/statements/{stmtName}/savepoints/{savepointName}?force=(true|false)

This endpoint performs a best-effort deletion of the physical savepoint data from the blob store.

Requirement: The corresponding Flink cluster must be running for the deletion to be processed. If the cluster is not running, the Savepoint resource will remain in CMF until the cluster is restarted and can perform the deletion.

Force Deletion: Using force=true will immediately delete the Savepoint resource from CMF and Kubernetes, regardless of whether the physical data is successfully deleted. This can lead to orphaned savepoint data in your blob store and should be used with caution.

Delete a detached savepoint

DELETE /cmf/api/v1/detached-savepoints/{savepointName}

This call only deletes the savepoint’s metadata from the CMF database. It does not delete the underlying physical savepoint data from the blob store. The user is expected to clean up the physical data manually if needed.

Start a job from a savepoint

Start applications

To start a Flink application from a savepoint, you have two options:

  1. Use Create/Update Application.

    When creating (POST) or updating (PUT) a Flink application, you can specify the startFromSavepoint field in the spec. You must specify one of the following mutually exclusive fields:

    • savepointName: The name of a CMF Savepoint resource (either attached or detached).
    • uid: The UID of a CMF Savepoint resource.
    • initialSavepointPath: A raw path to a savepoint file (e.g., s3://.../sp-1) that is not managed by CMF.

    This operation is analogous to standard Flink behavior: if the savepointRedeployNonce is not updated, a new deployment will not be triggered just to re-apply the savepoint.

  2. Use the Start Application API.

    You can use the start endpoint with a query parameter to force a start from a specific savepoint.

    POST /cmf/api/v1/environments/{envName}/applications/{appName}/start?startFromSavepointUid={uid}
    

    This operation always forces the application to start from the specified savepoint UID, even if the application’s spec already references the same savepoint. This is done by automatically updating the deployment nonce to trigger a new deploy.

Start statements

You can start statements only from a savepoint that is attached to that same statement. You must use savepointName or uid to reference an existing attached savepoint in the startFromSavepoint field. Starting from a detached savepoint or an initialSavepointPath is not supported for Statements.

Role permissions for savepoints

Permissions for managing savepoints are handled with either attached savepoint permissions or detached savepoint permissions.

Attached savepoints

Permissions are inherited from the parent Flink application or statement:

  • Create/Detach: Requires EDIT permission on the parent. Note that you can only use detach for Flink applications.
  • View: Requires VIEW permission on the parent.
  • Delete: Requires REMOVE permission on the parent.

Detached savepoints

Detached savepoints are governed by the FlinkDetachedSavepoint RBAC resource. For detailed information about role permissions for detached savepoints, see Role Permissions for detached savepoints in the access control documentation.

For information about how to assign these roles and manage permissions, see Configure Access Control for Confluent Manager for Apache Flink.

Current limitations

  • Deleting attached savepoints: Deletion of an attached savepoint and its physical data can only be performed when the corresponding Flink cluster is running.
  • Deleting detached savepoints: Deleting a detached savepoint from CMF only removes the metadata. The underlying physical data in the blob store is not deleted and must be cleaned up manually.
  • Statement detach: Savepoints attached to a Statement cannot be detached.
  • Statement restore: Statements can only be started from a savepoint that is attached to that same Statement.
  • Deleting parent resources: You cannot delete a Flink application or statement while it still has attached savepoints. You must delete (or detach, for FAs) them first.