Deduplicate Rows in a Table with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables generating a table that contains only unique records from an input table with only a few clicks.
In this guide, you create a Flink table and apply the Deduplicate Rows action to generate a topic that has only unique records, by using a deduplication statement. The Deduplicate Rows action creates a Flink SQL statement for you, but no knowledge of Flink SQL is required to use it.
This guide shows the following steps:
- Step 1: Create a users table
- Step 2: Apply the Deduplicate Topic action
- Step 3: Inspect the output table
Prerequisites¶
- Access to Confluent Cloud.
- The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, contact your OrganizationAdmin or EnvironmentAdmin. For more information, see Grant Role-Based Access in Confluent Cloud for Apache Flink.
- A provisioned Flink compute pool.
Step 1: Create a users table¶
Log in to Confluent Cloud and navigate to your Flink workspace.
Run the following statement to create a
users
table.CREATE TABLE users ( user_id STRING NOT NULL, registertime BIGINT, gender STRING, regionid STRING );
Insert rows with mock data into the
users
table.-- Populate the table with mock users data. -- Duplicate rows are removed in a later step. INSERT INTO users VALUES ('Thomas A. Anderson', 1677260724, 'male', 'Region_4'), ('Thomas A. Anderson', 1677260724, 'male', 'Region_4'), ('Trinity', 1677260733, 'female', 'Region_4'), ('Trinity', 1677260733, 'female', 'Region_4'), ('Morpheus', 1677260742, 'male', 'Region_8'), ('Morpheus', 1677260742, 'male', 'Region_8'), ('Dozer', 1677260823, 'male', 'Region_1'), ('Agent Smith', 1677260955, 'male', 'Region_0'), ('Persephone', 1677260901, 'female', 'Region_2'), ('Niobe', 1677260921, 'female', 'Region_3'), ('Niobe', 1677260921, 'female', 'Region_3'), ('Niobe', 1677260921, 'female', 'Region_3'), ('Zee', 1677260922, 'female', 'Region_5');
Inspect the inserted rows.
SELECT * FROM users;
Your output should resemble:
user_id registertime gender regionid Thomas A. Anderson 1677260724 male Region_4 Thomas A. Anderson 1677260724 male Region_4 Trinity 1677260733 female Region_4 Trinity 1677260733 female Region_4 Morpheus 1677260742 male Region_8 Morpheus 1677260742 male Region_8 Dozer 1677260823 male Region_1 Agent Smith 1677260955 male Region_0 Persephone 1677260901 female Region_2 Niobe 1677260921 female Region_3 Niobe 1677260921 female Region_3 Niobe 1677260921 female Region_3 Zee 1677260922 female Region_5
Step 2: Apply the Deduplicate Topic action¶
In the previous step, you created a Flink table that had duplicate rows. In this step, you apply the Deduplicate Topic action to create an output table that has only unique rows.
Navigate to the Environments page, and in the navigation menu, click Data portal.
In the Data portal page, click the dropdown menu and select the environment for your workspace.
In the Recently created section, find your users topic and click it to open the details pane.
Click Actions, and in the Actions list, click Deduplicate topic to open the Deduplicate topic dialog.
(Optional) In the Runtime configuration section, select Run with a service account to run the deduplicate query with a service account principal. Use this option for production queries.
Note
The service account you select must have the EnvironmentAdmin role to create topics, schemas, and run Flink statements.
Click the Show SQL toggle to view the statements that the action will run.
Flink uses the deduplication field as the output message key. This means that the output topic’s row key may be different from the input topic’s row key, because the deduplication statement’s DISTRIBUTED BY clause determines the output topic’s key.
For this example, the output message key is the
user_id
field.DESC `<your-environment>`.`<your-kafka-cluster>`.`users`; CREATE TABLE `<your-environment>`.`<your-kafka-cluster>`.`users_deduplicate` ( PRIMARY KEY (`user_id`) NOT ENFORCED ) DISTRIBUTED BY HASH( `user_id` ) WITH ( 'changelog.mode' = 'upsert', 'value.format'='avro-registry', 'key.format'='avro-registry', 'key.fields-prefix'='' ); INSERT INTO `<your-environment>`.`<your-kafka-cluster>`.`users_deduplicate` SELECT `user_id`, `key`, `registertime`, `gender`, `regionid` FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY `user_id` ORDER BY $rowtime ASC) AS row_num FROM `<your-environment>`.`<your-kafka-cluster>`.`users`) WHERE row_num = 1;
Click Confirm.
The action runs the CREATE TABLE and INSERT INTO statements. These statements register the
users_deduplicate
table and populate it with rows from theusers
table using a deduplication query.
Step 3: Inspect the output table¶
The statements generated by the Deduplicate Topic action created an
output table named users_deduplicate
. In this step, you query the output
table to see the deduplicated rows.
Return to your workspace and run the following command to inspect the
users_deduplicate
output table.SELECT * FROM users_deduplicate;
Your output should resemble:
user_id registertime gender regionid Thomas A. Anderson 1677260724 male Region_4 Trinity 1677260733 female Region_4 Morpheus 1677260742 male Region_8 Dozer 1677260823 male Region_1 Agent Smith 1677260955 male Region_0 Persephone 1677260901 female Region_2 Niobe 1677260921 female Region_3 Zee 1677260922 female Region_5