Deduplication Queries

Important

Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.

For Flink SQL features and limitations in the preview program, see Notable Limitations in Public Preview.

Confluent Cloud for Apache Flink®️ enables removing duplicate rows over a set of columns in a Flink SQL table.

Syntax

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY column1[, column2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

Parameter Specification

Note

This query pattern must be followed exactly, otherwise, the optimizer can’t translate the query.

  • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
  • PARTITION BY column1[, column2...]: Specifies the partition columns by the deduplicate key.
  • ORDER BY time_attr [asc|desc]: Specifies the ordering column, which must be a time attribute. Flink SQL supports the processing time attribute and the event time attribute. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
  • WHERE rownum = 1: The rownum = 1 is required for Flink SQL to recognize the query is deduplication.

Description

Deduplication removes duplicate rows over a set of columns, keeping only the first or last row.

Flink SQL uses the ROW_NUMBER() function to remove duplicates, similar to its usage in Top-N Queries. Deduplication is a special case of the Top-N query, in which N is 1 and row order is by processing time or event time.

In some cases, an upstream ETL job isn’t end-to-end exactly-once, which may cause duplicate records in the sink, in case of failover. Duplicate records affect the correctness of downstream analytical jobs, like SUM and COUNT, so deduplication is required before further analysis can continue.

Example

In the Flink SQL shell or in a Cloud Console workspace, run the following commands to see an example of row deduplication. The rows are deduplicated by the click_ts_raw column, which is a UNIX timestamp.

  1. Create a table for web page click events.

    CREATE TABLE clicks (
      ip_address VARCHAR,
      url VARCHAR,
      click_ts_raw BIGINT
    );
    
  2. Populate the table with mock clickstream data.

    INSERT INTO clicks
    VALUES( '10.0.0.1',  'https://acme.com/index.html',     1692812175),
          ( '10.0.0.12', 'https://apache.org/index.html',   1692826575),
          ( '10.0.0.13', 'https://confluent.io/index.html', 1692826575),
          ( '10.0.0.1',  'https://acme.com/index.html',     1692812175),
          ( '10.0.0.12', 'https://apache.org/index.html',   1692819375),
          ( '10.0.0.13', 'https://confluent.io/index.html', 1692826575);
    
  3. Run the following statement to return the deduplicated rows.

    SELECT ip_address, url, FROM_UNIXTIME(click_ts_raw) as click_timestamp
    FROM (
            SELECT *,
            ROW_NUMBER() OVER ( PARTITION BY ip_address ORDER BY TO_TIMESTAMP(FROM_UNIXTIME(click_ts_raw)) ) as rownum FROM clicks
          )
    WHERE rownum = 1;
    

    Your output should resemble:

    ip_address url                              click_timestamp
    10.0.0.1   https://acme.com/index.html      2023-08-23 10:36:15
    10.0.0.13  https://confluent.io/index.html  2023-08-23 14:36:15
    10.0.0.12  https://apache.org/index.html    2023-08-23 12:36:15