Carry-over Offsets in Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® supports carry-over offsets, so you can use the topic offsets from one statement to start a new statement.
Carry-over offsets provide a streamlined way to update Flink statements without data loss. This feature eliminates the manual complexity of copying offsets between statements and reduces the need to monitor statement status when deploying CI/CD pipelines.
Automatic orchestration handles the upgrade process. The system automatically waits for the old statement to stop before starting the new one, providing a seamless transition of processing between statements.
Carry-over offsets are available only when replacing an existing statement. This feature enables you to evolve statements with exactly-once semantics across the update when the statement is “stateless”, as determined by the system. At a high level, “stateless” applies to statements that can process each event independently and in any order.
For other scenarios, such as aggregates, lag, windows, pattern matching, or use of an upsert sink, this feature does not apply, because the update can cause inconsistent results.
To use carry-over offsets, add the sql.tables.initial-offset-from property to the statement configuration when you create your new statement, for example:
In the Confluent Cloud Console and the Flink SQL shell, you can set the property by using the SET statement, for example:
SET 'sql.tables.initial-offset-from' = '<reference-statement-name>'
The <reference-statement-name> is the name of the statement that you want to use as the reference for the carry-over offsets.
If you’re using the Statements API or the Confluent Terraform provider, you can set the property by using the properties field, for example:
{
"properties": {
"sql.tables.initial-offset-from": "<reference-statement-name>"
}
}
Considerations for carry-over offsets
Regional limitations
The referenced statement must be in the same organization, environment, and region as the new statement.
This property does not support cross-region offset carry-over.
Timeout behavior
New statements wait up to 6 hours for the referenced statement to stop.
If the timeout expires, the new statement fails with an error message indicating the reason.
Table options priority
Explicit table options in your SQL text take precedence over inherited offsets.
Only tables without explicit options use carried-over offsets.
Example of table options priority:
INSERT INTO output
SELECT * FROM table1
UNION ALL
SELECT * FROM table2 /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
Result: table1 uses carried-over offsets, table2 uses the specified latest-offset mode.
Common issues
Statement not found error
Verify the referenced statement name is correct.
Ensure the statement exists in the same organization, environment, and region.
Timeout exceeded
Check whether the old statement is actually stopping.
Verify there are no blocking conditions preventing termination.
Invalid SQL error
Confluent Cloud validates the new statement’s syntax immediately upon creation.
Fix SQL syntax errors before the offset carry-over process begins.
Referenced statement savepoint failed
The system could not submit the statement because the referenced statement didn’t enter a stopped state gracefully. Data inconsistencies can occur when using offsets from failed savepoints.
Try to resume the referenced statement and stop it again.
If there are still issues, contact Confluent Support.
Examples
Statement already stopped
You have a stopped statement named my-original-statement.
Create a new statement with updated logic:
SET 'sql.tables.initial-offset-from' = 'my-original-statement';
INSERT INTO enhanced_output
SELECT
user_id,
event_type,
timestamp,
new_field
FROM user_events
WHERE event_type IN ('click', 'view', 'purchase');
Statement still running
Your original statement metrics-processor-v1 is still running. Create a new statement that references it:
SET 'sql.tables.initial-offset-from' = 'metrics-processor-v1';
INSERT INTO enhanced_output
SELECT
user_id,
event_type,
timestamp,
new_field
FROM user_events
WHERE event_type IN ('click', 'view', 'purchase');
The new statement remains in the “Pending” state until you stop metrics-processor-v1.