Mask Fields in a Table with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables generating a topic that contains masked fields from an input topic with only a few clicks.
In this guide, you create a Flink table and apply the Mask Fields action to generate a topic that has user names masked out, by using a preconfigured regular expression. The Mask Fields action creates a Flink SQL statement for you, but no knowledge of Flink SQL is required to use it.
This guide shows the following steps:
- Step 1: Inspect the example stream
- Step 2: Create a source table
- Step 3: Apply the Mask Fields action
- Step 4: Inspect the output table
- Step 5: Stop the persistent query
Prerequisites¶
- Access to Confluent Cloud.
- The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, contact your OrganizationAdmin or EnvironmentAdmin. For more information, see Grant Role-Based Access in Confluent Cloud for Apache Flink.
- A provisioned Flink compute pool.
Step 1: Inspect the example stream¶
In this step, you query the read-only customers
table in the
examples.marketplace
database to inspect the stream for fields that you
can mask.
Log in to Confluent Cloud and navigate to your Flink workspace.
In the Use catalog dropdown, select your environment.
In the Use database dropdown, select your Kafka cluster.
Run the following statement to inspect the example
customers
stream.SELECT * FROM examples.marketplace.customers;
Your output should resemble:
customer_id name address postcode city email 3134 Dr. Andrew Terry 45488 Eileen Walk 78690 Latoyiaberg romaine.lynch@hotmail.com 3243 Miss Shelby Lueilwitz 199 Bernardina Brook 79991 Johnburgh dominick.oconner@hotmail.c… 3027 Korey Hand 655 Murray Turnpike 08917 Port Sukshire karlyn.ziemann@yahoo.com ...
Step 2: Create a source table¶
In the step, you create a customers_source
table for the data from the
example customers
stream. You use the
INSERT INTO FROM SELECT
statement to populate the table with streaming data.
Run the following statement to register the
customers_source
table. Confluent Cloud for Apache Flink creates a backing Kafka topic that has the same name automatically.-- Register a customers source table. CREATE TABLE customers_source ( customer_id INT NOT NULL, name STRING, address STRING, postcode STRING, city STRING, email STRING, PRIMARY KEY(`customer_id`) NOT ENFORCED );
Run the following statement to populate the
customers_source
table with data from the examplecustomers
stream.-- Persistent query to stream data from -- the customers example stream to the -- customers_source table. INSERT INTO customers_source( customer_id, name, address, postcode, city, email ) SELECT customer_id, name, address, postcode, city, email FROM examples.marketplace.customers;
Run the following statement to inspect the
customers_source
table.SELECT * FROM customers_source;
Your output should resemble:
customer_id name address postcode city email 3088 Phil Grimes 07738 Zieme Court 84845 Port Dillontown garnett.abernathy@hotmail.com 3022 Jeana Gaylord 021 Morgan Drives 35160 West Celena emile.daniel@gmail.com 3097 Lily Ryan 671 Logan Throughway 58261 Dickinsonburgh ivory.lockman@gmail.com ...
Step 3: Apply the Mask Fields action¶
In the previous step, you created a Flink table that had rows with customer
names, which might be confidential data. In this step, you apply the Mask Fields
action to create an output table that has the contents of the name
field
masked.
Navigate to the Environments page, and in the navigation menu, click Data portal.
In the Data portal page, click the dropdown menu and select the environment for your workspace.
In the Recently created section, find your customers_source topic and click it to open the details pane.
Click Actions, and in the Actions list, click Mask fields to open the Mask fields dialog.
In the Field to mask dropdown, select name.
In the Regex for name dropdown, select Word characters.
In the Runtime configuration section, either select an existing service account or create a new service account for the current action.
Note
The service you select must have the EnvironmentAdmin role to create topics, schemas, and run Flink statements.
Optionally, click the Show SQL toggle to view the statements that the action will run.
The code resembles:
CREATE TABLE `<your-environment>`.`<your-kafka-cluster>`.`customers_source_mask` LIKE `<your-environment>`.`<your-kafka-cluster>`.`customers_source` INSERT INTO `<your-environment>`.`<your-kafka-cluster>`.`customers_source_mask` SELECT `customer_id`, REGEXP_REPLACE(`name`, '(\w)', '*') as `name`, address, postcode, city, email FROM `<your-environment>`.`<your-kafka-cluster>`.`customers_source`;
Click Confirm.
The action runs the CREATE TABLE and INSERT INTO statements. These statements register the
customers_source_mask
table and populate it with rows from thecustomers_source
table. The strings in thename
column are masked by the REGEXP_REPLACE function.
Step 4: Inspect the output table¶
The statements that were generated by the Mask Fields action created an
output table named customers_source_mask
. In this step, you query the
output table to see the masked field values.
Return to your workspace and run the following command to inspect the
customers_source_mask
output table.SELECT * FROM customers_source_mask;
Your output should resemble:
customer_id name address postcode city email 3104 **** *** ****** 342 Odis Hollow 27615 West Florentino bryce.hodkiewicz@hotmail.c… 3058 **** ******* ****** 33569 Turner Glens 14107 Schummchester sarah.roob@yahoo.com 3138 **** ****** ******** 944 Elden Walks 39293 New Ernestbury velvet.volkman@gmail.com ...
Step 5: Stop the persistent query¶
The INSERT INTO statement that was created by the Mask Fields action runs continuously until you stop it manually. Free resources in your compute pool by deleting the long-running statement.
- Navigate to the Flink page in your environment and click Flink statements.
- In the statements list, find the statement that has a status of Running.
- In the Actions column, click … and select Delete statement.
- In the Confirm statement deletion dialog, copy and paste the statement name and click Confirm.