Use Client-Side Field Level Encryption in Confluent Cloud

Client-side field level encryption (CSFLE) in Confluent Cloud provides a robust security measure for protecting sensitive data within your Kafka topics. With CSFLE, you can encrypt specific fields in your data at the client application (producer) before it is sent to Confluent Cloud, and decrypt it at the client application (consumer) after it is received. This ensures that sensitive information is protected end-to-end, and is not readable by anyone who has access to the Kafka cluster, including Confluent personnel.

Note

This page describes how to configure client-side field level encryption programmatically. For steps on using the Confluent Cloud Console, see Manage Client-Side Field Level Encryption using Confluent Cloud Console.

CSFLE uses a technique called envelope encryption. This involves two types of keys:

  • Key Encryption Key (KEK): A master key used to encrypt other keys. KEKs are typically stored and managed in an external key management service (KMS), such as AWS KMS, Azure Key Vault, or Google Cloud KMS.
  • Data Encryption Key (DEK): The key used to encrypt the actual data fields. DEKs are themselves encrypted by a KEK.

Schema Registry plays a central role in managing these keys through its DEK Registry component. The DEK Registry stores the KEK metadata and the encrypted DEKs. It can also be configured to communicate directly with your KMS to manage the lifecycle of DEKs. This integration simplifies key management and enables other Confluent Cloud services, like Flink and ksqlDB, to process encrypted data.

The overall workflow for implementing CSFLE is as follows:

  1. Set up your encryption keys: Create a Key Encryption Key (KEK) in your preferred Key Management Service (KMS).
  2. Register your KEK: Make Schema Registry aware of your KEK by registering it with the DEK Registry.
  3. Define what to encrypt: Use tags in your schemas to mark the fields that contain sensitive data (for example, “PII”).
  4. Create encryption rules: Define rules that tell the client which tags to look for and how to encrypt the fields associated with those tags. This includes specifying which KEK to use.
  5. Configure KMS access: Grant the DEK Registry permission to access your KMS for key operations, or configure your client application to do so directly.
  6. Produce and Consume encrypted data: Your producer client automatically encrypts the tagged fields before sending messages, and your consumer client decrypts them upon receipt.

Tip

Watch the Confluent Developer course Client-Side Field-Level Encryption (CSFLE) to learn more about CSFLE and how it works. The course includes the following modules:

Requirements

To use client-side field level encryption (CSFLE) in Confluent Cloud, the following requirements must be met.

Stream Governance Advanced package

Clients

Supported clients for use with CSFLE include:

Confluent Platform or Apache Kafka® Java Client

Supported versions include:

  • Confluent Platform 7.4.5 or later (Kafka 3.4.5 or later)
  • Confluent Platform 7.5.4 or later (Kafka 3.5.5 or later)
  • Confluent Platform 7.6.1 or later (Kafka 3.6.1 or later)

For details, see Apache Kafka Java Client.

Confluent Go Client

Confluent .NET Client

Confluent JavaScript Client for Apache Kafka®

Confluent Python Client for Kafka

Key management service (KMS)

CSFLE supports the following key management services (KMS):

Confluent Cloud resources

For Confluent Cloud resources that are supported for CSFLE, see:

Configure client-side field level encryption

The following steps show how to configure client-side field level encryption for use with Confluent Cloud.

Step 1: Create a KEK in your key management service (KMS)

To start using CSFLE, the first step is to create a KEK using your key management service (KMS).

For the steps to create a KEK, see Create a KEK.

Step 2: Register the KEK with the DEK Registry

After you have a KEK, you can register it with the DEK Registry using Confluent Cloud Console, Confluent CLI, or REST APIs.

Note that if the KEK is not registered beforehand, you can register it on demand by the client, assuming that the client has the appropriate permissions with the DEK Registry.

For the steps to register a KEK with the DEK Registry, see Register a KEK with the DEK Registry.

Step 3: Add tags to the schema fields

Tags are metadata attributes that you can add to schema fields. You can use tags to specify which fields are encrypted. These tags are used to define the encryption policies that specify which fields in the data schema need to be encrypted and how they should be handled.

Important

Before using the tags, you must add the tag definitions in Stream Catalog.

Tags can either be inline or external. Inline tags are embedded directly in a schema, while external tags are specified using the Catalog API. When working with tags, keep in mind the following interaction between catalog tags and inline tags:

  • For a schema with no inline tags, you can add catalog tags using the Catalog API. Once an inline tag is applied, the schema evolves and all catalog tags are migrated into inline tags.
  • For a schema with inline tags, catalog tags cannot be applied. The request returns a 4xx error.

Here is an example of how tags can be added to schema fields in a JSON schema:

{
  "type":"record",
  "name":"MyRecord",
  "fields":[{
    "name":"ssn",
    "type":"string",
    "confluent:tags": [ "PII", "PRIVATE" ]
  }]
}

In this example, the Social Security Number (ssn) field includes PII and PRIVATE to indicate that the field includes personally identifiable information (PII) that is marked as private.

Step 3 - Define an encryption policy

After adding the tags, you need to define an encryption policy that specifies rules for which tags use for encryption. The encryption policy is defined in a JSON file that is then uploaded to Confluent Cloud. Here is an example of an encryption policy:

{
  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
      {
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "encrypt.kek.name": "<kekName>"
        }
      }
    ]
  }
}

Note that you specified the name of the KEK in step 1. If the KEK has not yet been registered, you can optionally specify the KMS key ID and KMS type in the rule. The client automatically registers the KEK before registering any DEKs.

{
  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
      {
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "encrypt.kek.name": "<kekName>",
           "encrypt.kms.key.id": "<kmsKeyId>",
           "encrypt.kms.type": "aws-kms"
        }
      }
    ]
  }
}

During registration, if the schema is omitted, then the ruleset attaches to the latest schema in the subject.

After registration, you need to include the following in the client:

  • auto.register.schemas=false
  • use.latest.version=true

If you do not include these properties, the client attempts to register, or look up, a schema without any existing rules.

Encryption is supported for fields of type string or bytes. Type integer is currently not supported. If a client does not have a rule executor for the ENCRYPT rule type and attempts to consume a message, then the message is not decrypted and the client receives the encrypted values.

Two additional properties can be specified for DEKs in the rule parameters:

Parameter Description
encrypt.dek.algorithm The encryption algorithm being used. Valid values include AES128_GCM, AES256_GCM (default), or AES256_SIV. You can use AES256_SIV for deterministic encryption, which ensures that the same input data always produces the same encrypted output. This is particularly useful for stream processing with Flink, as it allows for consistent querying and joining of encrypted data. For more information, see Secure Stream Processing: Query Encrypted Data with Flink on Confluent Cloud.
encrypt.dek.expiry.days If specified, automatic DEK rotation occurs. When a DEK is older than the expiration period, a new DEK is generated and used for new messages, while previous DEKs are available to decrypt older messages. There is a limit to the number of existing DEKs (10,000) that can be retained in the DEK Registry, so use this property cautiously.
preserve.source.fields For performance reasons, the fields of a message are updated during field-level transforms. For field-level encryption, this results in the field values being replaced with the encrypted field values. If the original field values should be retained in the message, then set this property to true.

Step 4: Configure the KMS key encryption key

For Confluent Cloud, the DEK Registry can be granted permission to access your KMS.

To provide proper access for the DEK Registry, obtain the KMS key policy that needs to be added to the key, either through the Confluent Cloud Console or by using the following REST endpoint:

https://psrc-xxxxx.<region>.<provider>.confluent.cloud/dek-registry/v1/policy

If the DEK Registry has not been granted permission to access the KMS, then the credentials to access the KMS must be specified on the client.

For each of the supported KMS providers, the following dependencies are required:

<dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-schema-registry-client-encryption-aws</artifactId>
   <version>7.6.0</version>
</dependency>

Next, you need to configure the following parameters on the clients.

rule.executors._default_.param.access.key.id=<AWS access key>
rule.executors._default_.param.secret.access.key=<AWS secret key>

Alternatively, the AWS access key and AWS secret key can be passed using environment variables, named AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

Now, whenever a message is sent, the ssn field is automatically encrypted before serialization and decrypted after deserialization.

Step 5: Produce with Kafka serializers

To produce with Kafka serializers, add the appropriate serializer to the producer properties:

  • KafkaAvroSerializer
  • KafkaProtobufSerializer
  • KafkaJsonSchemaSerializer

Handle errors

If the Schema Registry client cannot decrypt the encrypted fields, it throws an error. You can configure the client to pass the encrypted data through in spite of the error.

Warning

Do not use dead-letter queues (DLQs) with the current version of CSFLE because of the potential risk that unencrypted, sensitive data might be exposed in the DLQ. This can occur if messages with fields expected to be encrypted fail to be processed and are routed to the DLQ without proper encryption. Work is in progress to address this issue.

Pass encrypted data through

To pass encrypted data through in spite of an error, on the client specify an onFailure property in the rule. Here is an example:

{
  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
      {
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "encrypt.kek.name": "<name of KEK>"
        },
        "onFailure": "ERROR,NONE"
      }
    ]
  }
}

The onFailure setting value of ERROR,NONE above includes two comma-separated values, the first for encryption and the second for decryption.

  • ERROR: If encryption fails, an error is thrown.
  • NONE: If decryption fails, the encrypted value is passed through without decryption.

Rule ordering and its impact on DLQ

When multiple rules are defined in the domainRules array, they are processed sequentially in the order they appear. This ordering is critical for ensuring proper data protection, especially when considering potential DLQ scenarios.

Rule execution order

Rules in the domainRules array are executed in the order they are defined. Each rule processes the data after the previous rule has completed its transformation. This sequential processing means that the output of one rule becomes the input for the next rule.

Important

Rule ordering matters: The order in which you define your encryption rules can significantly impact the final state of your data and what gets sent to the DLQ if processing fails.

Example: Multiple encryption rules

Consider a scenario where you have multiple fields that need different levels of encryption:

{
  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
      {
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "encrypt.kek.name": "pii-kek"
        },
        "onFailure": "ERROR,NONE"
      },
      {
        "name": "encryptSensitive",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["SENSITIVE"],
        "params": {
           "encrypt.kek.name": "sensitive-kek"
        },
        "onFailure": "ERROR,NONE"
      }
    ]
  }
}

In this example:

  1. First rule (encryptPII): Processes all fields tagged with PII using the pii-kek key.
  2. Second rule (encryptSensitive): Processes all fields tagged with SENSITIVE using the sensitive-kek key.

If a field has both PII and SENSITIVE tags, it is encrypted twice - first by the PII rule, then by the SENSITIVE rule.

Impact on DLQ scenarios

The rule execution order directly affects what data might be exposed in a DLQ:

Scenario 1: Early failure in rule chain

If the first rule fails and has onFailure: "ERROR", the message fails before reaching subsequent rules. This means:

  • Fields that should be encrypted by later rules remain unencrypted
  • The unencrypted data could be sent to the DLQ
  • Sensitive information is exposed

Scenario 2: Late failure in rule chain

If a later rule fails, earlier rules have already processed the data:

  • Fields processed by earlier rules are properly encrypted.
  • Only fields that should be processed by the failing rule remain unencrypted.
  • Less sensitive data is exposed in the DLQ.

Best practices for rule ordering

To minimize data exposure in DLQ scenarios:

  • Order rules by sensitivity: Place rules for the most sensitive data first
  • Use appropriate onFailure settings: Consider using "NONE" for less critical rules to allow processing to continue
  • Test failure scenarios: Verify what data reaches the DLQ when rules fail
  • Monitor rule execution: Track which rules are failing and adjust ordering accordingly

Example: Optimized rule ordering

{
  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
      {
        "name": "encryptCritical",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["CRITICAL"],
        "params": {
           "encrypt.kek.name": "critical-kek"
        },
        "onFailure": "ERROR,NONE"
      },
      {
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "encrypt.kek.name": "pii-kek"
        },
        "onFailure": "NONE,NONE"
      },
      {
        "name": "encryptStandard",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["STANDARD"],
        "params": {
           "encrypt.kek.name": "standard-kek"
        },
        "onFailure": "NONE,NONE"
      }
    ]
  }
}

In this optimized configuration:

  • Critical data is encrypted first with strict error handling.
  • PII and standard data use "NONE" failure handling to allow processing to continue.
  • If any rule fails, the most sensitive data is already protected.

Warning

Test your rule ordering: Always test your rule configurations with failure scenarios to ensure that sensitive data is properly protected before reaching any DLQ or error handling mechanisms.

Access control (RBAC) for CSFLE

  • CSFLE authorizations are evaluated on the KEK resource by name. Bind roles on Kek:<kekName> in the target Schema Registry cluster for your Confluent Cloud environment.
  • RBAC roles do not decrypt DEKs. DEK encrypt/decrypt is enforced by your cloud KMS IAM (for example, AWS KMS, Azure Key Vault, Google Cloud KMS).

Note

KEK sharing with Confluent (Cloud-only)

  • In Confluent Cloud you can optionally share KEK access with Confluent so fully managed services can process encrypted data.
  • Whether KEK is shared or not, only principals granted RBAC permission to the KEK can request DEKs to encrypt/decrypt.
  • If KEK is not shared, no Confluent-managed service or operator can decrypt data under any conditions. Encrypted DEKs are persisted; without KEK access from KMS the data cannot be decrypted.

Typical role sets

  • Developer on a subject using CSFLE:
    • Subject: DeveloperRead (and DeveloperWrite if producing/updating schemas).
    • KEK: DeveloperWrite (or DeveloperRead if only consuming) on Kek:<kekName>.
  • Service/resource owner:
    • ResourceOwner on the subject; optionally on the KEK used by that subject.
  • Platform/security admin:
    • ClusterAdmin on the Schema Registry cluster; SystemAdmin only for break-glass.