Checkpointing with Confluent Manager for Apache Flink
Checkpointing is Flink’s mechanism for creating fault-tolerant snapshots of application state. When enabled, Flink periodically snapshots the state of your streaming job, allowing it to recover from failures and continue processing with exactly-once or at-least-once semantics.
This topic describes how to configure checkpointing for your Flink applications and SQL statements deployed with Confluent Manager for Apache Flink® (CMF).
Overview
Checkpointing enables fault-tolerance by periodically creating consistent snapshots of distributed state. If a failure occurs, Flink can restore from the most recent checkpoint and resume processing.
Checkpointing is related to but distinct from savepoints:
Checkpoints are automatic, system-controlled snapshots for failure recovery
Savepoints are user-controlled, portable snapshots used for planned operations like upgrades or migrations
For information about managing savepoints with CMF, see Manage Savepoints in Confluent Manager for Apache Flink.
Key configuration properties
The following table lists essential checkpointing configuration properties:
Property | Description | Recommendation |
|---|---|---|
| How often to trigger checkpoints (for example, | The checkpointing frequency impacts many aspects of Flink: How often exactly once sinks commit data, how much reprocessing is needed after a failure, and the performance overhead of checkpointing. Faster checkpointing is more resource intensive, but allows for faster recovery and quicker commits to external systems. |
| Directory for storing checkpoint data. Must be accessible by all TaskManagers. | We recommend a durable distributed file system with fast data access and high availability, such as Amazon S3, Google Cloud Storage, Azure Blob Storage, or HDFS. |
| Maximum time for a checkpoint to complete before it is aborted (default: | Large state or slow storage can require longer timeouts. If checkpoints frequently time out, consider increasing this value. |
| Minimum time between checkpoint completions (default: | Consider a scenario where checkpoints take longer than the checkpointing interval. Setting a minimum pause makes sure that a Flink job is not constantly busy with checkpointing, but has more resources in between for doing useful work. |
| State backend type: | We recommend |
For a complete list of checkpointing configuration options, see the Apache Flink Configuration Reference.
Configure checkpointing for a FlinkApplication
To configure checkpointing for a Flink Application, specify the checkpointing properties in the spec.flinkConfiguration section of your FlinkApplication resource.
The following example shows a FlinkApplication with checkpointing enabled:
apiVersion: cmf.confluent.io/v1
kind: FlinkApplication
metadata:
name: checkpointed-app
spec:
image: confluentinc/cp-flink:1.19.1-cp1
flinkVersion: v1_19
flinkConfiguration:
execution.checkpointing.interval: "60s"
execution.checkpointing.timeout: "10min"
state.checkpoints.dir: "s3://my-bucket/checkpoints/"
state.backend.type: "rocksdb"
serviceAccount: flink
jobManager:
resource:
memory: 1024m
cpu: 1
taskManager:
resource:
memory: 1024m
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
state: running
parallelism: 3
upgradeMode: stateless
Configure checkpointing for SQL Statements
For SQL Statements, you can configure checkpointing at two levels:
ComputePool level: Applies to all statements running in the pool. Use this for common settings such as checkpoint storage location or authentication
Statement level: Overrides ComputePool settings for a specific statement. Use this for configuring a specific state backend or checkpoint interval.
Statement-level configuration takes precedence over ComputePool-level configuration.
Option 1: Configure via ComputePool
Configure checkpointing in the ComputePool’s spec.clusterSpec.flinkConfiguration to apply the settings to all statements in the pool:
apiVersion: cmf.confluent.io/v1
kind: ComputePool
metadata:
name: checkpointed-pool
spec:
type: DEDICATED
clusterSpec:
flinkVersion: v1_19
image: confluentinc/cp-flink-sql:1.19-cp1
flinkConfiguration:
execution.checkpointing.interval: "60s"
state.checkpoints.dir: "s3://my-bucket/checkpoints/"
state.backend.type: "rocksdb"
taskManager:
resource:
cpu: 1.0
memory: "1024m"
jobManager:
resource:
cpu: 0.5
memory: "1024m"
Option 2: Configure per Statement
To override checkpointing settings for a specific statement, specify the configuration in the spec.flinkConfiguration section of the Statement resource:
apiVersion: cmf.confluent.io/v1
kind: Statement
metadata:
name: my-statement
spec:
statement: "INSERT INTO foo SELECT * FROM orders;"
properties:
sql.current-catalog: kafka-cat
sql.current-database: kafka-db
flinkConfiguration:
execution.checkpointing.interval: "30s"
computePoolName: checkpointed-pool
parallelism: 4
In this example, the statement uses a 30-second checkpoint interval, overriding the 60-second interval configured in the ComputePool.
Checkpoint storage considerations
Checkpoint data must be stored in a location accessible to all TaskManagers in your Flink cluster. Common storage options include:
Amazon S3
Google Cloud Storage
Azure Blob Storage
HDFS
You must configure the appropriate credentials and access permissions for your chosen storage system. For details on configuring storage backends, see the Apache Flink Filesystem documentation.