Use Client-Side Field Level Encryption with Confluent Cloud for Apache Flink
This guide shows you how to use client-side field level encryption with Confluent Cloud for Apache Flink®. CSFLE enables you to encrypt specific fields in your messages before they are sent to the server, ensuring that sensitive information is protected even if the server is compromised.
This walkthrough shows you how to process encrypted data using Confluent Cloud for Apache Flink with CSFLE. You create a schema with encrypted fields, set up a Flink table to read the encrypted data, and run queries that transparently decrypt the data for processing.
Note
Client-side field level encryption with Flink is a Limited Availability feature in Confluent Cloud.
For limitations during the Limited Availability phase, see Limitations and known issues.
If you would like to participate in the Limited Availability Program, contact your Confluent account team.
For more information on Flink support for CSFLE, see Process Encrypted Data with Confluent Cloud for Apache Flink.
Confluent Cloud for Apache Flink also supports client-side payload encryption (CSPE), which encrypts the entire message payload. For more information on using CSPE, see Use Client-Side Payload Encryption.
Prerequisites
Access to Confluent Cloud
An environment with the Stream Governance Advanced package enabled
A provisioned Flink compute pool
Authorized user with the following roles for KEK access:
DeveloperRead on the input topic
DeveloperWrite on the output topic
DeveloperRead on Schema Registry subjects for input and output topics
DeveloperWrite on the KEK for the first write to generate the Data Encryption Key
DeveloperRead on the KEK for subsequent reads and writes
DeveloperManage on the output topic and subject for creating output tables
The ARN for an encryption key generated in AWS KMS to use as the KEK for encryption rules
Step 1: Set up CSFLE on Confluent Cloud
Register the KEK in Confluent Cloud.
Log in to Confluent Cloud Console.
Navigate to the environment where you want to use CSFLE.
In the navigation menu, navigate to Schema Registry > Encryption Keys and click Add encryption key.
Name the key “csfle-key”.
In the Key management system provider dropdown, select AWS.
In the Amazon resource name (key ID) textbox, provide the ARN of the AWS KMS key.
Enable Share key access with Confluent Cloud to give Flink access to the KEK.
Click Add to save the key.
Step 2: Create a topic and schema with encrypted fields
Create a Flink table for encrypted customer data based on the built-in examples.marketplace.customers table, and tag sensitive fields as PII.
Create a table for encrypted customer data:
CREATE TABLE customers_encrypted ( customer_id INT NOT NULL, customer_name VARCHAR, address STRING, postcode STRING, city STRING, email STRING, PRIMARY KEY (customer_id) NOT ENFORCED );
This statement creates the
customers_encryptedtopic and registers its Avro schema in Schema Registry.In Confluent Cloud Console, navigate to Stream Governance > Tags and verify that a
PIItag exists. If it doesn’t, click Add tag and create a tag namedPII.Navigate to Schema Registry > Data contracts and open the
customers_encrypted-valuesubject.Click Evolve, and in the editor, add the
PIItag to theemailandaddressfields. These tags identify the fields that encryption rules protect.{ "fields": [ { "default": null, "name": "customer_id", "type": [ "null", "int" ] }, { "default": null, "name": "customer_name", "type": [ "null", "string" ] }, { "default": null, "name": "address", "type": [ "null", "string" ], "confluent:tags": ["PII"] }, { "default": null, "name": "postcode", "type": [ "null", "string" ] }, { "default": null, "name": "city", "type": [ "null", "string" ] }, { "default": null, "name": "email", "type": [ "null", "string" ], "confluent:tags": ["PII"] } ], "name": "customers_encrypted_value", "namespace": "org.apache.flink.avro.generated.record", "type": "record" }
Before the closing curly brace, add the following JSON to attach a domain rule that encrypts fields tagged with
PIIusing the KEK you registered in Step 1. Ensure that you add a comma after the"type": "record"field in the previous snippet to maintain valid JSON syntax.For client-side payload encryption (CSPE), specify
"type": "ENCRYPT_PAYLOAD"."domainRules": [ { "name": "encrypt-pii", "kind": "TRANSFORM", "mode": "WRITEREAD", "type": "ENCRYPT", "tags": ["PII"], "params": { "encrypt.kek.name": "csfle-key" }, "onFailure": "ERROR", "onSuccess": "NONE" } ]
Click Save to update the schema with the PII tags and domain rules.
Return to your workspace and run the following query to populate the table with data from the
customerstable:INSERT INTO customers_encrypted SELECT * FROM `examples`.`marketplace`.`customers`;
This statement reads from the built-in
customerstable. Theemailandaddressfields are automatically encrypted according to the domain rules you attached in the previous step.
Step 3: Query encrypted data
Run queries on the encrypted data. If you have DeveloperRead permission on the KEK, Flink transparently decrypts the encrypted fields for processing.
Example: Count customers by email domain
SELECT
SUBSTRING(email, POSITION('@' IN email) + 1) AS domain,
COUNT(*) AS customer_count
FROM customers_encrypted
GROUP BY SUBSTRING(email, POSITION('@' IN email) + 1);
In this query:
Flink decrypts the
emailfieldThe SQL operations work with plaintext email addresses
The results show aggregated counts by email domain
Example: Filter by encrypted field
SELECT customer_id, email, city
FROM customers_encrypted
WHERE address LIKE '%New York%';
In this query:
Flink decrypts the
addressfield for filteringThe
WHEREclause operates on plaintext address valuesOnly matching rows are returned
What happens without key permissions
If you run Flink SQL statements without DeveloperRead permission on the encryption keys, the behavior depends on the encryption type:
- CSFLE without permissions
Encrypted fields remain encrypted and pass through without decryption. Operations like
COUNT(DISTINCT address)count distinct ciphertext values, not distinct addresses. For more information, see Process Encrypted Data with Confluent Cloud for Apache Flink.- CSPE without permissions
The Flink SQL statement fails because Flink cannot decrypt the payload.
For a complete description of permission-based behavior, see Process Encrypted Data with Confluent Cloud for Apache Flink.