Use Client-Side Field Level Encryption on Confluent Platform

Client-side field level encryption (CSFLE) uses a technique called envelope encryption, where a key encryption key (KEK) is used to encrypt data encryption keys (DEKs), which are the actual keys used to encrypt fields. The KEK is typically a key from an external Key Management System (KMS), such as AWS KMS, Azure Key Vault, or Google Cloud KMS.

Schema Registry exposes a subcomponent called the DEK Registry, which provides APIs for managing KEKs and DEKs. The key from the KMS is registered as a KEK to the DEK Registry, and the DEK Registry will also hold the encrypted DEKs used for CSFLE. DEKs are scoped by subject.

In Confluent Platform, for a given KEK, the DEK Registry can optionally be given permission to make direct calls to the KMS in order to encrypt and decrypt DEKs. The main advantage of granting this permission is to enable other Confluent Platform offerings, such as Flink and ksqlDB, to process data that was previously encrypted by CSFLE.

If permission is granted, then the DEK Registry will generate the DEK, encrypt it, and make the decrypted DEK available to clients that have the proper authorization. If permission is not granted, then the client will typically generate the DEK, encrypt it, and store it to the DEK Registry.

At a high level, CSFLE requires the following steps:

  1. Create a KEK in an external KMS.
  2. Register the KEK to the DEK Registry.
  3. Specify the tags on fields.
  4. Declare encryption rules that specify which tags should be encrypted.
  5. Authorize the DEK Registry to access the KMS, or configure the client to access the KMS.
  6. Produce and consume data.


To use client-side field level encryption (CSFLE) on Confluent Platform, the following requirements must be met.

Confluent Platform

Confluent Platform Enterprise 7.9 or later.

To enable schema rules for Confluent Enterprise, add the following to the Schema Registry properties before starting Schema Registry. The value of the property below can be a comma-separated list of class names:



Supported clients for use with CSFLE include:

Confluent Platform or Apache Kafka® Java Client

Supported versions include:

  • Confluent Platform 7.4.5 or later (Kafka 3.4.5 or later)
  • Confluent Platform 7.5.4 or later (Kafka 3.5.5 or later)
  • Confluent Platform 7.6.1 or later (Kafka 3.6.1 or later)

For details, see Apache Kafka Java Client.

Confluent Go Client

Confluent .NET Client

Confluent JavaScript Client for Schema Registry

Confluent Python Client for Kafka

Key management service (KMS)

CSFLE supports the following key management services (KMS):

Confluent Platform resources

For Confluent Platform resources that are supported for CSFLE, see Use CSFLE for Confluent Enterprise.

Configure client-side field level encryption

The following steps show how to configure client-side field level encryption for use with Confluent Platform Enterprise.

Step 1: Create a KEK in your key management service (KMS)

To start using CSFLE, the first step is to create a KEK using your key management service (KMS).

For the steps to create a KEK, see Create a KEK.

Step 2: Register the KEK with the DEK Registry

After you have a KEK, you can register it with the DEK Registry using Confluent CLI or REST APIs.

Note that if the KEK is not registered beforehand, you can register it on demand by the client, assuming that the client has the appropriate permissions with the DEK Registry.

For the steps to register a KEK with the DEK Registry, see Register a KEK with the DEK Registry.

Step 3: Add tags to the schema fields

Tags are metadata attributes that you can add to schema fields. You can use tags to specify which fields are encrypted. These tags are used to define the encryption policies that specify which fields in the data schema need to be encrypted and how they should be handled.

Tags can either be inline or external. Here is an example of how tags can be added to schema fields in a JSON schema:

    "confluent:tags": [ "PII", "PRIVATE" ]

In this example, the Social Security Number (ssn) field includes PII and PRIVATE to indicate that the field includes personally identifiable information (PII) that is marked as private.

Step 3 - Define an encryption policy

After adding the tags, you need to define an encryption policy that specifies rules for which tags use for encryption. The encryption policy is defined in a JSON file that is then uploaded to Confluent Platform. Here is an example of an encryption policy:

  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "": "<kekName>"

Note that you specified the name of the KEK in step 1. If the KEK has not yet been registered, you can optionally specify the KMS key ID and KMS type in the rule. The client automatically registers the KEK before registering any DEKs.

  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "": "<kekName>",
           "": "<kmsKeyId>",
           "encrypt.kms.type": "aws-kms"

During registration, if the schema is omitted, then the ruleset attaches to the latest schema in the subject.

After registration, you need to include the following in the client:

  • auto.register.schemas=false
  • use.latest.version=true

If you do not include these properties, the client attempts to register, or look up, a schema without any existing rules.

Encryption is supported for fields of type string or bytes. Type integer is currently not supported. If a client does not have a rule executor for the ENCRYPT rule type and attempts to consume a message, then the message is not decrypted and the client receives the encrypted values.

Two additional properties can be specified for DEKs in the rule parameters:

Parameter Description
encrypt.dek.algorithm The encryption algorithm being used. Valid values include AES128_GCM, AES256_GCM (default), or AES256_SIV. You can use AES256_SIV for deterministic encryption of keys.
encrypt.dek.expiry.days If specified, automatic DEK rotation occurs. When a DEK is older than the expiration period, a new DEK is generated and used for new messages, while previous DEKs are available to decrypt older messages. There is a limit to the number of existing DEKs (10,000) that can be retained in the DEK Registry, so use this property cautiously.
preserve.source.fields For performance reasons, the fields of a message are updated during field-level transforms. For field-level encryption, this results in the field values being replaced with the encrypted field values. If the original field values should be retained in the message, then set this property to true.

Step 4: Configure the KMS key encryption key

For Confluent Platform Enterprise, the DEK Registry can be granted permission to access your KMS.

To provide proper access for the DEK Registry, obtain the KMS key policy that needs to be added to the key by using the following REST endpoint:


If the DEK Registry has not been granted permission to access the KMS, then the credentials to access the KMS must be specified on the client.

For each of the supported KMS providers, the following dependencies are required:


Next, you need to configure the following parameters on the clients.<AWS access key>
rule.executors._default_.param.secret.access.key=<AWS secret key>

Alternatively, the AWS access key and AWS secret key can be passed using environment variables, named AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

Now, whenever a message is sent, the ssn field is automatically encrypted before serialization and decrypted after deserialization.

Step 5: Produce with Kafka serializers

To produce with Kafka serializers, add the appropriate serializer to the producer properties:

  • KafkaAvroSerializer
  • KafkaProtobufSerializer
  • KafkaJsonSchemaSerializer

Access control (RBAC) for CSFLE

You must set the following property to enforce RBAC for CSFLE on Confluent Enterprise: dek.registry.rbac.enable=true. To learn more about this configuration option, see the entry for dek.registry.rbac.enable in the Schema Registry Configuration Reference.

RBAC role bindings for CSFLE on Confluent Enterprise are shown below.

KEK/DEK Role Permissions
Role Name Create KEK/DEK Read KEK/DEK Update KEK/DEK Delete KEK/DEK
ResourceOwner ✔ (based on scoping) ✔ (based on scoping) ✔ (based on scoping) ✔ (based on scoping)
DeveloperRead   ✔ (based on scoping)    
DeveloperWrite ✔ (based on scoping) ✔ (based on scoping) ✔ (based on scoping)  

Handle errors

If the Schema Registry client cannot decrypt the encrypted fields, it throws an error. You can configure the client to pass the encrypted data through in spite of the error.


Do not use dead-letter queues (DLQs) with the current version of CSFLE because of the potential risk that unencrypted, sensitive data might be exposed in the DLQ. This can occur if messages with fields expected to be encrypted fail to be processed and are routed to the DLQ without proper encryption. Work is in progress to address this issue.

To pass encrypted data through

To pass encrypted data through in spite of an error, on the client specify an onFailure property in the rule. Here is an example:

  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "": "<name of KEK>"
        "onFailure": "ERROR,NONE"

The onFailure setting value of ERROR,NONE above includes two comma-separated values, the first for encryption and the second for decryption.

  • ERROR: If encryption fails, an error is thrown.
  • NONE: If decryption fails, the encrypted value is passed through without decryption.