Use Client-Side Field Level Encryption with Confluent Cloud for Apache Flink

This guide shows you how to use client-side field level encryption with Confluent Cloud for Apache Flink®. CSFLE enables you to encrypt specific fields in your messages before they are sent to the server, ensuring that sensitive information is protected even if the server is compromised.

This walkthrough shows you how to process encrypted data using Confluent Cloud for Apache Flink with CSFLE. You create a schema with encrypted fields, set up a Flink table to read the encrypted data, and run queries that transparently decrypt the data for processing.

Note

Client-side field level encryption with Flink is a Limited Availability feature in Confluent Cloud.

For limitations during the Limited Availability phase, see Limitations and known issues.

If you would like to participate in the Limited Availability Program, contact your Confluent account team.

For more information on Flink support for CSFLE, see Process Encrypted Data with Confluent Cloud for Apache Flink.

Confluent Cloud for Apache Flink also supports client-side payload encryption (CSPE), which encrypts the entire message payload. For more information on using CSPE, see Use Client-Side Payload Encryption.

Prerequisites

  • Access to Confluent Cloud

  • An environment with the Stream Governance Advanced package enabled

  • A provisioned Flink compute pool

  • Authorized user with the following roles for KEK access:

    • DeveloperRead on the input topic

    • DeveloperWrite on the output topic

    • DeveloperRead on Schema Registry subjects for input and output topics

    • DeveloperWrite on the KEK for the first write to generate the Data Encryption Key

    • DeveloperRead on the KEK for subsequent reads and writes

    • DeveloperManage on the output topic and subject for creating output tables

  • The ARN for an encryption key generated in AWS KMS to use as the KEK for encryption rules

Step 1: Set up CSFLE on Confluent Cloud

Register the KEK in Confluent Cloud.

  1. Log in to Confluent Cloud Console.

  2. Navigate to the environment where you want to use CSFLE.

  3. In the navigation menu, navigate to Schema Registry > Encryption Keys and click Add encryption key.

  4. Name the key “csfle-key”.

  5. In the Key management system provider dropdown, select AWS.

  6. In the Amazon resource name (key ID) textbox, provide the ARN of the AWS KMS key.

  7. Enable Share key access with Confluent Cloud to give Flink access to the KEK.

  8. Click Add to save the key.

Step 2: Create a topic and schema with encrypted fields

Create a Flink table for encrypted customer data based on the built-in examples.marketplace.customers table, and tag sensitive fields as PII.

  1. Create a table for encrypted customer data:

    CREATE TABLE customers_encrypted (
      customer_id INT NOT NULL,
      customer_name VARCHAR,
      address STRING,
      postcode STRING,
      city STRING,
      email STRING,
      PRIMARY KEY (customer_id) NOT ENFORCED
    );
    

    This statement creates the customers_encrypted topic and registers its Avro schema in Schema Registry.

  2. In Confluent Cloud Console, navigate to Stream Governance > Tags and verify that a PII tag exists. If it doesn’t, click Add tag and create a tag named PII.

  3. Navigate to Schema Registry > Data contracts and open the customers_encrypted-value subject.

  4. Click Evolve, and in the editor, add the PII tag to the email and address fields. These tags identify the fields that encryption rules protect.

    {
      "fields": [
          {
          "default": null,
          "name": "customer_id",
          "type": [
              "null",
              "int"
          ]
          },
          {
          "default": null,
          "name": "customer_name",
          "type": [
              "null",
              "string"
          ]
          },
          {
          "default": null,
          "name": "address",
          "type": [
              "null",
              "string"
          ],
          "confluent:tags": ["PII"]
          },
          {
          "default": null,
          "name": "postcode",
          "type": [
              "null",
              "string"
          ]
          },
          {
          "default": null,
          "name": "city",
          "type": [
              "null",
              "string"
          ]
          },
          {
          "default": null,
          "name": "email",
          "type": [
              "null",
              "string"
          ],
          "confluent:tags": ["PII"]
          }
      ],
      "name": "customers_encrypted_value",
      "namespace": "org.apache.flink.avro.generated.record",
      "type": "record"
    }
    
  5. Before the closing curly brace, add the following JSON to attach a domain rule that encrypts fields tagged with PII using the KEK you registered in Step 1. Ensure that you add a comma after the "type": "record" field in the previous snippet to maintain valid JSON syntax.

    For client-side payload encryption (CSPE), specify "type": "ENCRYPT_PAYLOAD".

    "domainRules": [
      {
        "name": "encrypt-pii",
        "kind": "TRANSFORM",
        "mode": "WRITEREAD",
        "type": "ENCRYPT",
        "tags": ["PII"],
        "params": {
          "encrypt.kek.name": "csfle-key"
        },
        "onFailure": "ERROR",
        "onSuccess": "NONE"
      }
    ]
    
  6. Click Save to update the schema with the PII tags and domain rules.

  7. Return to your workspace and run the following query to populate the table with data from the customers table:

    INSERT INTO customers_encrypted
      SELECT *
      FROM `examples`.`marketplace`.`customers`;
    

    This statement reads from the built-in customers table. The email and address fields are automatically encrypted according to the domain rules you attached in the previous step.

Step 3: Query encrypted data

Run queries on the encrypted data. If you have DeveloperRead permission on the KEK, Flink transparently decrypts the encrypted fields for processing.

Example: Count customers by email domain

SELECT
  SUBSTRING(email, POSITION('@' IN email) + 1) AS domain,
  COUNT(*) AS customer_count
FROM customers_encrypted
GROUP BY SUBSTRING(email, POSITION('@' IN email) + 1);

In this query:

  • Flink decrypts the email field

  • The SQL operations work with plaintext email addresses

  • The results show aggregated counts by email domain

Example: Filter by encrypted field

SELECT customer_id, email, city
FROM customers_encrypted
WHERE address LIKE '%New York%';

In this query:

  • Flink decrypts the address field for filtering

  • The WHERE clause operates on plaintext address values

  • Only matching rows are returned

What happens without key permissions

If you run Flink SQL statements without DeveloperRead permission on the encryption keys, the behavior depends on the encryption type:

CSFLE without permissions

Encrypted fields remain encrypted and pass through without decryption. Operations like COUNT(DISTINCT address) count distinct ciphertext values, not distinct addresses. For more information, see Process Encrypted Data with Confluent Cloud for Apache Flink.

CSPE without permissions

The Flink SQL statement fails because Flink cannot decrypt the payload.

For a complete description of permission-based behavior, see Process Encrypted Data with Confluent Cloud for Apache Flink.