Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For Flink SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
Confluent Cloud for Apache Flink®️ enables removing duplicate rows over a set of columns in a Flink SQL table.
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY column1[, column2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1
This query pattern must be followed exactly, otherwise, the optimizer can’t translate the query.
ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
PARTITION BY column1[, column2...]: Specifies the partition columns by the deduplicate key.
ORDER BY time_attr [asc|desc]: Specifies the ordering column, which must be a time attribute. Flink SQL supports the processing time attribute and the event time attribute. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
WHERE rownum = 1: The
rownum = 1is required for Flink SQL to recognize the query is deduplication.
Deduplication removes duplicate rows over a set of columns, keeping only the first or last row.
Flink SQL uses the
ROW_NUMBER() function to remove duplicates, similar
to its usage in Top-N Queries. Deduplication is a special case of the
Top-N query, in which
N is 1 and row order is by processing time or event
In some cases, an upstream ETL job isn’t end-to-end exactly-once, which may
cause duplicate records in the sink, in case of failover. Duplicate records
affect the correctness of downstream analytical jobs, like
so deduplication is required before further analysis can continue.
In the Flink SQL shell or in a Cloud Console workspace, run the
following commands to see an example of row deduplication. The rows are
deduplicated by the
click_ts_raw column, which is a UNIX timestamp.
Create a table for web page click events.
CREATE TABLE clicks ( ip_address VARCHAR, url VARCHAR, click_ts_raw BIGINT );
Populate the table with mock clickstream data.
INSERT INTO clicks VALUES( '10.0.0.1', 'https://acme.com/index.html', 1692812175), ( '10.0.0.12', 'https://apache.org/index.html', 1692826575), ( '10.0.0.13', 'https://confluent.io/index.html', 1692826575), ( '10.0.0.1', 'https://acme.com/index.html', 1692812175), ( '10.0.0.12', 'https://apache.org/index.html', 1692819375), ( '10.0.0.13', 'https://confluent.io/index.html', 1692826575);
Run the following statement to return the deduplicated rows.
SELECT ip_address, url, FROM_UNIXTIME(click_ts_raw) as click_timestamp FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY ip_address ORDER BY TO_TIMESTAMP(FROM_UNIXTIME(click_ts_raw)) ) as rownum FROM clicks ) WHERE rownum = 1;
Your output should resemble:
ip_address url click_timestamp 10.0.0.1 https://acme.com/index.html 2023-08-23 10:36:15 10.0.0.13 https://confluent.io/index.html 2023-08-23 14:36:15 10.0.0.12 https://apache.org/index.html 2023-08-23 12:36:15