Checkpointing with Confluent Manager for Apache Flink

Checkpointing is Flink’s mechanism for creating fault-tolerant snapshots of application state. When enabled, Flink periodically snapshots the state of your streaming job, allowing it to recover from failures and continue processing with exactly-once or at-least-once semantics.

This topic describes how to configure checkpointing for your Flink applications and SQL statements deployed with Confluent Manager for Apache Flink® (CMF).

Overview

Checkpointing enables fault-tolerance by periodically creating consistent snapshots of distributed state. If a failure occurs, Flink can restore from the most recent checkpoint and resume processing.

Checkpointing is related to but distinct from savepoints:

  • Checkpoints are automatic, system-controlled snapshots for failure recovery

  • Savepoints are user-controlled, portable snapshots used for planned operations like upgrades or migrations

For information about managing savepoints with CMF, see Manage Savepoints in Confluent Manager for Apache Flink.

Key configuration properties

The following table lists essential checkpointing configuration properties:

Property

Description

Recommendation

execution.checkpointing.interval

How often to trigger checkpoints (for example, 60s, 5min). Required to enable checkpointing.

The checkpointing frequency impacts many aspects of Flink: How often exactly once sinks commit data, how much reprocessing is needed after a failure, and the performance overhead of checkpointing. Faster checkpointing is more resource intensive, but allows for faster recovery and quicker commits to external systems.

state.checkpoints.dir

Directory for storing checkpoint data. Must be accessible by all TaskManagers.

We recommend a durable distributed file system with fast data access and high availability, such as Amazon S3, Google Cloud Storage, Azure Blob Storage, or HDFS.

execution.checkpointing.timeout

Maximum time for a checkpoint to complete before it is aborted (default: 10min).

Large state or slow storage can require longer timeouts. If checkpoints frequently time out, consider increasing this value.

execution.checkpointing.min-pause

Minimum time between checkpoint completions (default: 0).

Consider a scenario where checkpoints take longer than the checkpointing interval. Setting a minimum pause makes sure that a Flink job is not constantly busy with checkpointing, but has more resources in between for doing useful work.

state.backend.type

State backend type: hashmap (in-memory) or rocksdb (on-disk).

We recommend rocksdb for its robustness, hashmap is useful for small state or very performance sensitive workloads.

For a complete list of checkpointing configuration options, see the Apache Flink Configuration Reference.

Configure checkpointing for a FlinkApplication

To configure checkpointing for a Flink Application, specify the checkpointing properties in the spec.flinkConfiguration section of your FlinkApplication resource.

The following example shows a FlinkApplication with checkpointing enabled:

apiVersion: cmf.confluent.io/v1
kind: FlinkApplication
metadata:
  name: checkpointed-app
spec:
  image: confluentinc/cp-flink:1.19.1-cp1
  flinkVersion: v1_19
  flinkConfiguration:
    execution.checkpointing.interval: "60s"
    execution.checkpointing.timeout: "10min"
    state.checkpoints.dir: "s3://my-bucket/checkpoints/"
    state.backend.type: "rocksdb"
  serviceAccount: flink
  jobManager:
    resource:
      memory: 1024m
      cpu: 1
  taskManager:
    resource:
      memory: 1024m
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    state: running
    parallelism: 3
    upgradeMode: stateless

Configure checkpointing for SQL Statements

For SQL Statements, you can configure checkpointing at two levels:

  • ComputePool level: Applies to all statements running in the pool. Use this for common settings such as checkpoint storage location or authentication

  • Statement level: Overrides ComputePool settings for a specific statement. Use this for configuring a specific state backend or checkpoint interval.

Statement-level configuration takes precedence over ComputePool-level configuration.

Option 1: Configure via ComputePool

Configure checkpointing in the ComputePool’s spec.clusterSpec.flinkConfiguration to apply the settings to all statements in the pool:

apiVersion: cmf.confluent.io/v1
kind: ComputePool
metadata:
  name: checkpointed-pool
spec:
  type: DEDICATED
  clusterSpec:
    flinkVersion: v1_19
    image: confluentinc/cp-flink-sql:1.19-cp1
    flinkConfiguration:
      execution.checkpointing.interval: "60s"
      state.checkpoints.dir: "s3://my-bucket/checkpoints/"
      state.backend.type: "rocksdb"
    taskManager:
      resource:
        cpu: 1.0
        memory: "1024m"
    jobManager:
      resource:
        cpu: 0.5
        memory: "1024m"

Option 2: Configure per Statement

To override checkpointing settings for a specific statement, specify the configuration in the spec.flinkConfiguration section of the Statement resource:

apiVersion: cmf.confluent.io/v1
kind: Statement
metadata:
  name: my-statement
spec:
  statement: "INSERT INTO foo SELECT * FROM orders;"
  properties:
    sql.current-catalog: kafka-cat
    sql.current-database: kafka-db
  flinkConfiguration:
    execution.checkpointing.interval: "30s"
  computePoolName: checkpointed-pool
  parallelism: 4

In this example, the statement uses a 30-second checkpoint interval, overriding the 60-second interval configured in the ComputePool.

Checkpoint storage considerations

Checkpoint data must be stored in a location accessible to all TaskManagers in your Flink cluster. Common storage options include:

  • Amazon S3

  • Google Cloud Storage

  • Azure Blob Storage

  • HDFS

You must configure the appropriate credentials and access permissions for your chosen storage system. For details on configuring storage backends, see the Apache Flink Filesystem documentation.