Handle Late-Arriving Data

Configure custom late data handling to prevent data loss when using aggressive watermarking strategies in Confluent Cloud for Apache Flink®.

Late data refers to events that arrive after the watermark has advanced past their event timestamp. Late data handling ensures this data is preserved for later reconciliation or backfill.

Prerequisites

  • Access to Confluent Cloud.

  • A provisioned Flink compute pool.

  • An Flink table backed by a Apache Kafka® topic with event-time semantics.

  • Understanding of watermarks and event time.

Overview

Flink provides two modes for handling late data at the source level:

pass-through (default)

Late events flow through to downstream operators. Window aggregations and other time-based operators handle late data according to their own policies. Use this mode when you want standard Flink behavior.

filter

Late events are filtered immediately at the source. The main pipeline processes only on-time data. Filtered events are preserved in a System Table (<table_name>$late) for inspection or reprocessing. Use this mode when you need strict watermark compliance in real-time pipelines while retaining all data.

Configure late data filtering

Option 1: Alter existing table

Enable filtering on an existing table:

ALTER TABLE Orders SET (
  'late-handling.mode' = 'filter'
);

This change affects all new statements that read from the table. Running statements continue with their original configuration until restarted.

Option 2: Create table with filtering

Configure filtering when creating a table:

CREATE TABLE Orders (
  order_id INT,
  customer_id INT,
  amount DECIMAL(10,2),
  order_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) DISTRIBUTED INTO 3 BUCKETS
WITH (
  'late-handling.mode' = 'filter'
);

Query the system table

When late-handling.mode = 'filter' is configured, late data is available in a System Table with the $late suffix. System Tables are virtual views on the source table and do not create additional physical storage.

View late data:

SELECT * FROM `Orders$late`;

Your output should resemble:

order_id  customer_id  amount    order_time
--------  -----------  ------    ----------
1001      5001         99.50     2026-04-14 10:00:04
1003      5012         250.00    2026-04-14 10:00:06

Tip

Use backticks (`` ` ) when querying tables with ``$ in the name.

Reprocessing patterns

Combine real-time results with late data for accurate daily reporting:

-- Real-time pipeline (fast, strict watermarks)
INSERT INTO RealtimeSales
SELECT
  order_id,
  SUM(amount) OVER (
    ORDER BY order_time
    RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) AS running_total
FROM Orders;

-- End-of-day reconciliation (merges late data)
SELECT SUM(amount) AS final_daily_total
FROM (
  SELECT amount FROM RealtimeSales
  UNION ALL
  SELECT amount FROM `Orders$late`
);

Multi-table statements

When a statement reads from multiple tables, for example, in a JOIN, each table’s late-handling.mode is applied independently at the source level before the JOIN operator.

Example:

-- Orders table has filter mode
ALTER TABLE Orders SET ('late-handling.mode' = 'filter');

-- Customers table uses default pass-through
-- (no late-handling.mode set)

-- JOIN applies each table's configuration independently
SELECT o.order_id, o.amount, c.name
FROM Orders o
JOIN Customers c ON o.customer_id = c.customer_id;

Result: Late orders are filtered at the Orders source. Late customers pass through.

Reset to default behavior

Revert to the default pass-through mode:

ALTER TABLE Orders RESET ('late-handling.mode');

Important considerations

Retention and data availability

System Tables ($late) are virtual views on the source table. Late data availability is governed by the source table’s retention policy:

  • If the source table has 24-hour retention, late data expires after 24 hours

  • To preserve late data longer, explicitly persist it with INSERT INTO from the $late table

Schema evolution

System Tables automatically inherit schema changes from the source table.