Handle Late-Arriving Data
Configure custom late data handling to prevent data loss when using aggressive watermarking strategies in Confluent Cloud for Apache Flink®.
Late data refers to events that arrive after the watermark has advanced past their event timestamp. Late data handling ensures this data is preserved for later reconciliation or backfill.
Prerequisites
Access to Confluent Cloud.
A provisioned Flink compute pool.
An Flink table backed by a Apache Kafka® topic with event-time semantics.
Understanding of watermarks and event time.
Overview
Flink provides two modes for handling late data at the source level:
- pass-through (default)
Late events flow through to downstream operators. Window aggregations and other time-based operators handle late data according to their own policies. Use this mode when you want standard Flink behavior.
- filter
Late events are filtered immediately at the source. The main pipeline processes only on-time data. Filtered events are preserved in a System Table (
<table_name>$late) for inspection or reprocessing. Use this mode when you need strict watermark compliance in real-time pipelines while retaining all data.
Configure late data filtering
Option 1: Alter existing table
Enable filtering on an existing table:
ALTER TABLE Orders SET (
'late-handling.mode' = 'filter'
);
This change affects all new statements that read from the table. Running statements continue with their original configuration until restarted.
Option 2: Create table with filtering
Configure filtering when creating a table:
CREATE TABLE Orders (
order_id INT,
customer_id INT,
amount DECIMAL(10,2),
order_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) DISTRIBUTED INTO 3 BUCKETS
WITH (
'late-handling.mode' = 'filter'
);
Query the system table
When late-handling.mode = 'filter' is configured, late data is available in a System Table with the $late suffix. System Tables are virtual views on the source table and do not create additional physical storage.
View late data:
SELECT * FROM `Orders$late`;
Your output should resemble:
order_id customer_id amount order_time
-------- ----------- ------ ----------
1001 5001 99.50 2026-04-14 10:00:04
1003 5012 250.00 2026-04-14 10:00:06
Tip
Use backticks (`` ` ) when querying tables with ``$ in the name.
Reprocessing patterns
Combine real-time results with late data for accurate daily reporting:
-- Real-time pipeline (fast, strict watermarks)
INSERT INTO RealtimeSales
SELECT
order_id,
SUM(amount) OVER (
ORDER BY order_time
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total
FROM Orders;
-- End-of-day reconciliation (merges late data)
SELECT SUM(amount) AS final_daily_total
FROM (
SELECT amount FROM RealtimeSales
UNION ALL
SELECT amount FROM `Orders$late`
);
Multi-table statements
When a statement reads from multiple tables, for example, in a JOIN, each table’s late-handling.mode is applied independently at the source level before the JOIN operator.
Example:
-- Orders table has filter mode
ALTER TABLE Orders SET ('late-handling.mode' = 'filter');
-- Customers table uses default pass-through
-- (no late-handling.mode set)
-- JOIN applies each table's configuration independently
SELECT o.order_id, o.amount, c.name
FROM Orders o
JOIN Customers c ON o.customer_id = c.customer_id;
Result: Late orders are filtered at the Orders source. Late customers pass through.
Reset to default behavior
Revert to the default pass-through mode:
ALTER TABLE Orders RESET ('late-handling.mode');
Important considerations
Retention and data availability
System Tables ($late) are virtual views on the source table. Late data availability is governed by the source table’s retention policy:
If the source table has 24-hour retention, late data expires after 24 hours
To preserve late data longer, explicitly persist it with
INSERT INTOfrom the$latetable
Schema evolution
System Tables automatically inherit schema changes from the source table.