Deduplicate Rows in a Table with Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® enables generating a table that contains only unique records from an input table with only a few clicks.

In this guide, you create a Flink table and apply the Deduplicate Rows action to generate a topic that has only unique records, by using a deduplication statement. The Deduplicate Rows action creates a Flink SQL statement for you, but no knowledge of Flink SQL is required to use it.

This guide shows the following steps:

Prerequisites

  • Access to Confluent Cloud.
  • The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, contact your OrganizationAdmin or EnvironmentAdmin. For more information, see Grant Role-Based Access in Confluent Cloud for Apache Flink.
  • A provisioned Flink compute pool.

Step 1: Create a users table

  1. Log in to Confluent Cloud and navigate to your Flink workspace.

  2. Run the following statement to create a users table.

    CREATE TABLE users (
      user_id STRING NOT NULL,
      registertime BIGINT,
      gender STRING,
      regionid STRING
    );
    
  3. Insert rows with mock data into the users table.

    -- Populate the table with mock users data.
    -- Duplicate rows are removed in a later step.
    INSERT INTO users VALUES
      ('Thomas A. Anderson', 1677260724, 'male', 'Region_4'),
      ('Thomas A. Anderson', 1677260724, 'male', 'Region_4'),
      ('Trinity', 1677260733, 'female', 'Region_4'),
      ('Trinity', 1677260733, 'female', 'Region_4'),
      ('Morpheus', 1677260742, 'male', 'Region_8'),
      ('Morpheus', 1677260742, 'male', 'Region_8'),
      ('Dozer', 1677260823, 'male', 'Region_1'),
      ('Agent Smith', 1677260955, 'male', 'Region_0'),
      ('Persephone', 1677260901, 'female', 'Region_2'),
      ('Niobe', 1677260921, 'female', 'Region_3'),
      ('Niobe', 1677260921, 'female', 'Region_3'),
      ('Niobe', 1677260921, 'female', 'Region_3'),
      ('Zee', 1677260922, 'female', 'Region_5');
    
  4. Inspect the inserted rows.

    SELECT * FROM users;
    

    Your output should resemble:

    user_id            registertime gender regionid
    Thomas A. Anderson 1677260724   male   Region_4
    Thomas A. Anderson 1677260724   male   Region_4
    Trinity            1677260733   female Region_4
    Trinity            1677260733   female Region_4
    Morpheus           1677260742   male   Region_8
    Morpheus           1677260742   male   Region_8
    Dozer              1677260823   male   Region_1
    Agent Smith        1677260955   male   Region_0
    Persephone         1677260901   female Region_2
    Niobe              1677260921   female Region_3
    Niobe              1677260921   female Region_3
    Niobe              1677260921   female Region_3
    Zee                1677260922   female Region_5
    

Step 2: Apply the Deduplicate Topic action

In the previous step, you created a Flink table that had duplicate rows. In this step, you apply the Deduplicate Topic action to create an output table that has only unique rows.

  1. Navigate to the Environments page, and in the navigation menu, click Data portal.

  2. In the Data portal page, click the dropdown menu and select the environment for your workspace.

  3. In the Recently created section, find your users topic and click it to open the details pane.

  4. Click Actions, and in the Actions list, click Deduplicate topic to open the Deduplicate topic dialog.

  5. (Optional) In the Runtime configuration section, select Run with a service account to run the deduplicate query with a service account principal. Use this option for production queries.

    Note

    The service account you select must have the EnvironmentAdmin role to create topics, schemas, and run Flink statements.

  6. Click the Show SQL toggle to view the statements that the action will run.

    Flink uses the deduplication field as the output message key. This means that the output topic’s row key may be different from the input topic’s row key, because the deduplication statement’s DISTRIBUTED BY clause determines the output topic’s key.

    For this example, the output message key is the user_id field.

    DESC `<your-environment>`.`<your-kafka-cluster>`.`users`;
    
    CREATE TABLE `<your-environment>`.`<your-kafka-cluster>`.`users_deduplicate` (
           PRIMARY KEY (`user_id`) NOT ENFORCED
    ) DISTRIBUTED BY HASH(
           `user_id`
    ) WITH (
           'changelog.mode' = 'upsert',
           'value.format'='avro-registry',
           'key.format'='avro-registry',
           'key.fields-prefix'=''
    );
    
    INSERT INTO `<your-environment>`.`<your-kafka-cluster>`.`users_deduplicate` SELECT
      `user_id`, `key`, `registertime`, `gender`, `regionid` FROM (
         SELECT *,
                ROW_NUMBER() OVER (PARTITION BY `user_id` ORDER BY $rowtime ASC) AS row_num
           FROM `<your-environment>`.`<your-kafka-cluster>`.`users`) WHERE row_num = 1;
    
  7. Click Confirm.

    The action runs the CREATE TABLE and INSERT INTO statements. These statements register the users_deduplicate table and populate it with rows from the users table using a deduplication query.

Step 3: Inspect the output table

The statements generated by the Deduplicate Topic action created an output table named users_deduplicate. In this step, you query the output table to see the deduplicated rows.

  • Return to your workspace and run the following command to inspect the users_deduplicate output table.

    SELECT * FROM users_deduplicate;
    

    Your output should resemble:

    user_id            registertime gender regionid
    Thomas A. Anderson 1677260724   male   Region_4
    Trinity            1677260733   female Region_4
    Morpheus           1677260742   male   Region_8
    Dozer              1677260823   male   Region_1
    Agent Smith        1677260955   male   Region_0
    Persephone         1677260901   female Region_2
    Niobe              1677260921   female Region_3
    Zee                1677260922   female Region_5