Handle Late-Arriving Data
Configure custom late data handling to prevent data loss when using aggressive watermarking strategies in Confluent Cloud for Apache Flink®.
Late data refers to events that arrive after the watermark has advanced past their event timestamp. Late data handling preserves this data for later reconciliation or backfill.
Prerequisites
Access to Confluent Cloud.
A provisioned Flink compute pool.
An Flink table backed by a Apache Kafka® topic with event-time semantics.
Understanding of watermarks and event time.
Overview
Flink provides two modes for handling late data at the source level:
- pass-through (default)
Late events flow through to downstream operators. Window aggregations and other time-based operators handle late data according to their own policies. Use this mode when you want standard Flink behavior.
- filter
The source filters late events immediately. The main pipeline processes only on-time data. Filtered events remain in a System Table (
<table_name>$late) for inspection or reprocessing. Use this mode when you need strict watermark compliance in real-time pipelines while retaining all data.
Configure late data filtering
Option 1: Alter existing table
Enable filtering on an existing table:
ALTER TABLE Orders SET (
'late-handling.mode' = 'filter'
);
This change affects all new statements that read from the table. Running statements continue with their original configuration until restarted.
Option 2: Create table with filtering
Configure filtering when creating a table:
CREATE TABLE Orders (
order_id INT,
customer_id INT,
amount DECIMAL(10,2),
order_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) DISTRIBUTED INTO 3 BUCKETS
WITH (
'late-handling.mode' = 'filter'
);
Query the system table
When you set late-handling.mode = 'filter', late data appears in a System Table with the $late suffix. System Tables are virtual views on the source table and do not create additional physical storage.
View late data:
SELECT * FROM `Orders$late`;
Your output should resemble:
order_id customer_id amount order_time
-------- ----------- ------ ----------
1001 5001 99.50 2026-04-14 10:00:04
1003 5012 250.00 2026-04-14 10:00:06
Tip
Use backticks (`` ` ) when querying tables with ``$ in the name.
Reprocessing patterns
Combine real-time results with late data for accurate daily reporting:
-- Real-time pipeline (fast, strict watermarks)
INSERT INTO RealtimeSales
SELECT
order_id,
SUM(amount) OVER (
ORDER BY order_time
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total
FROM Orders;
-- End-of-day reconciliation (merges late data)
SELECT SUM(amount) AS final_daily_total
FROM (
SELECT amount FROM RealtimeSales
UNION ALL
SELECT amount FROM `Orders$late`
);
Multi-table statements
When a statement reads from multiple tables, for example, in a JOIN, Flink applies each table’s late-handling.mode independently at the source level before the JOIN operator.
Example:
-- Orders table has filter mode
ALTER TABLE Orders SET ('late-handling.mode' = 'filter');
-- Customers table uses default pass-through
-- (no late-handling.mode set)
-- JOIN applies each table's configuration independently
SELECT o.order_id, o.amount, c.name
FROM Orders o
JOIN Customers c ON o.customer_id = c.customer_id;
Result: The Orders source filters late orders. Late customers pass through.
Reset to default behavior
Revert to the default pass-through mode:
ALTER TABLE Orders RESET ('late-handling.mode');
Important considerations
Retention and data availability
System Tables ($late) are virtual views on the source table. The source table’s retention policy governs late data availability:
If the source table has 24-hour retention, late data expires after 24 hours.
To preserve late data longer, explicitly persist it with
INSERT INTOfrom the$latetable.
Schema evolution
System Tables automatically inherit schema changes from the source table.