Deduplication Queries

Confluent Cloud for Apache Flink®️ enables removing duplicate rows over a set of columns in a Flink SQL table.

Syntax

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY column1[, column2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

Parameter Specification

Note

This query pattern must be followed exactly, otherwise, the optimizer can’t translate the query.

  • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
  • PARTITION BY column1[, column2...]: Specifies the partition columns by the deduplicate key.
  • ORDER BY time_attr [asc|desc]: Specifies the ordering column, which must be a time attribute. Flink SQL supports the processing time attribute and the event time attribute. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
  • WHERE rownum = 1: The rownum = 1 is required for Flink SQL to recognize the query is deduplication.

Description

Deduplication removes duplicate rows over a set of columns, keeping only the first or last row.

Flink SQL uses the ROW_NUMBER() function to remove duplicates, similar to its usage in Top-N Queries. Deduplication is a special case of the Top-N query, in which N is 1 and row order is by processing time or event time.

In some cases, an upstream ETL job isn’t end-to-end exactly-once, which may cause duplicate records in the sink, in case of failover. Duplicate records affect the correctness of downstream analytical jobs, like SUM and COUNT, so deduplication is required before further analysis can continue.

See deduplication in action

Apply the Deduplicate Topic action to generate a table that contains only unique records from an input table.

Example

In the Flink SQL shell or in a Cloud Console workspace, run the following statement to see an example of row deduplication. It returns the first URL that the customer has visited. The rows are deduplicated by the $rowtime column, which is the system column mapped to the Kafka record timestamp and can be either LogAppendTime or CreateTime.

  1. Run the following statement to return the deduplicated rows.

    SELECT user_id, url, $rowtime
    FROM (
       SELECT *, $rowtime,
         ROW_NUMBER() OVER (PARTITION BY user_id
           ORDER BY $rowtime ASC) AS rownum
       FROM `examples`.`marketplace`.`clicks`)
    WHERE rownum = 1;
    

    Your output should resemble:

    user_id    url                                  $rowtime
    3246       https://www.acme.com/product/upmtv   2024-04-16 08:04:47.365
    4028       https://www.acme.com/product/jtahp   2024-04-16 08:04:47.367
    4549       https://www.acme.com/product/ixsir   2024-04-16 08:04:47.367