Use Client-Side Field Level Encryption in Confluent Cloud

For steps on using Confluent Cloud Console to configure client-side field level encryption, see Manage Client-Side Field Level Encryption using Confluent Cloud Console.

Client-side field level encryption (CSFLE) uses a technique called envelope encryption, where a key encryption key (KEK) is used to encrypt data encryption keys (DEKs), which are the actual keys used to encrypt fields. The KEK is typically a key from an external Key Management System (KMS), such as AWS KMS, Azure Key Vault, or Google Cloud KMS.

Schema Registry exposes a subcomponent called the DEK Registry, which provides APIs for managing KEKs and DEKs. The key from the KMS is registered as a KEK to the DEK Registry, and the DEK Registry will also hold the encrypted DEKs used for CSFLE. DEKs are scoped by subject.

In Confluent Cloud, for a given KEK, the DEK Registry can optionally be given permission to make direct calls to the KMS in order to encrypt and decrypt DEKs. The main advantage of granting this permission is to enable other Confluent Cloud offerings, such as Flink and ksqlDB, to process data that was previously encrypted by CSFLE.

If permission is granted, then the DEK Registry will generate the DEK, encrypt it, and make the decrypted DEK available to clients that have the proper authorization. If permission is not granted, then the client will typically generate the DEK, encrypt it, and store it to the DEK Registry.

At a high level, CSFLE requires the following steps:

  1. Create a KEK in an external KMS.
  2. Register the KEK to the DEK Registry.
  3. Specify the tags on fields.
  4. Declare encryption rules that specify which tags should be encrypted.
  5. Authorize the DEK Registry to access the KMS in Confluent Cloud, or configure the client to access the KMS.
  6. Produce and consume data.

Requirements

To use client-side field level encryption (CSFLE) in Confluent Cloud, the following requirements must be met.

Stream Governance Advanced package

Clients

Supported clients for use with CSFLE include:

Confluent Platform or Apache Kafka® Java Client

Supported versions include:

  • Confluent Platform 7.4.5 or later (Kafka 3.4.5 or later)
  • Confluent Platform 7.5.4 or later (Kafka 3.5.5 or later)
  • Confluent Platform 7.6.1 or later (Kafka 3.6.1 or later)

For details, see Apache Kafka Java Client.

Confluent Go Client

Confluent .NET Client

Confluent Node.js Client for Schema Registry (Early Access)

Note

Confluent Node.js Client support for CSFLE and OAuth is an Early Access Program feature in Confluent Cloud. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions. To participate in this Early Access Program, contact your Confluent account manager.

Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.

Key management service (KMS)

CSFLE supports the following key management services (KMS):

Confluent Cloud resources

For Confluent Cloud resources that are supported for CSFLE, see:

Configure client-side field level encryption

The following steps show how to configure client-side field level encryption for use with Confluent Cloud.

Step 1: Create a KEK in your key management service (KMS)

To start using CSFLE, the first step is to create a KEK using your key management service (KMS).

For the steps to create a KEK, see Create a KEK.

Step 2: Register the KEK with the DEK Registry

After you have a KEK, you can register it with the DEK Registry using Confluent Cloud Console, Confluent CLI, or REST APIs.

Note that if the KEK is not registered beforehand, you can register it on demand by the client, assuming that the client has the appropriate permissions with the DEK Registry.

For the steps to register a KEK with the DEK Registry, see Register a KEK with the DEK Registry.

Step 3: Add tags to the schema fields

Tags are metadata attributes that you can add to schema fields. You can use tags to specify which fields are encrypted. These tags are used to define the encryption policies that specify which fields in the data schema need to be encrypted and how they should be handled.

Important

Before using the tags, you must add the tag definitions in Stream Catalog.

Tags can either be inline or external. Here is an example of how tags can be added to schema fields in a JSON schema:

{
  "type":"record",
  "name":"MyRecord",
  "fields":[{
    "name":"ssn",
    "type":"string",
    "confluent:tags": [ "PII", "PRIVATE" ]
  }]
}

In this example, the Social Security Number (ssn) field includes PII and PRIVATE to indicate that the field includes personally identifiable information (PII) that is marked as private.

Step 3 - Define an encryption policy

After adding the tags, you need to define an encryption policy that specifies rules for which tags use for encryption. The encryption policy is defined in a JSON file that is then uploaded to Confluent Cloud. Here is an example of an encryption policy:

{
  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
      {
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "encrypt.kek.name": "<kekName>"
        }
      }
    ]
  }
}

Note that you specified the name of the KEK in step 1. If the KEK has not yet been registered, you can optionally specify the KMS key ID and KMS type in the rule. The client automatically registers the KEK before registering any DEKs.

{
  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
      {
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "encrypt.kek.name": "<kekName>",
           "encrypt.kms.key.id": "<kmsKeyId>",
           "encrypt.kms.type": "aws-kms"
        }
      }
    ]
  }
}

During registration, if the schema is omitted, then the ruleset attaches to the latest schema in the subject.

After registration, you need to include the following in the client:

  • auto.register.schemas=false
  • use.latest.version=true

If you do not include these properties, the client attempts to register, or look up, a schema without any existing rules.

Encryption is supported for fields of type string or bytes. Type integer is currently not supported. If a client does not have a rule executor for the ENCRYPT rule type and attempts to consume a message, then the message is not decrypted and the client receives the encrypted values.

Two additional properties can be specified for DEKs in the rule parameters:

Parameter Description
encrypt.dek.algorithm The encryption algorithm being used. Valid values include AES128_GCM, AES256_GCM (default), or AES256_SIV. You can use AES265_SIV for deterministic encryption of keys.
encrypt.dek.expiry.days If specified, automatic DEK rotation occurs. When a DEK is older than the expiration period, a new DEK is generated and used for new messages, while previous DEKs are available to decrypt older messages. There is a limit to the number of existing DEKs (10,000) that can be retained in the DEK Registry, so use this property cautiously.
preserve.source.fields For performance reasons, the fields of a message are updated during field-level transforms. For field-level encryption, this results in the field values being replaced with the encrypted field values. If the original field values should be retained in the message, then set this property to true.

Step 4: Configure the KMS key encryption key

For Confluent Cloud, the DEK Registry can be granted permission to access your KMS.

To provide proper access for the DEK Registry, obtain the KMS key policy that needs to be added to the key, either through the Confluent Cloud Console or by using the following REST endpoint:

https://psrc-xxxxx.<region>.<provider>.confluent.cloud/dek-registry/v1/policy

If the DEK Registry has not been granted permission to access the KMS, then the credentials to access the KMS must be specified on the client.

For each of the supported KMS providers, the following dependencies are required:

<dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-schema-registry-client-encryption-aws</artifactId>
   <version>7.6.0</version>
</dependency>

Next, you need to configure the following parameters on the clients.

rule.executors._default_.param.access.key.id=<AWS access key>
rule.executors._default_.param.secret.access.key=<AWS secret key>

Alternatively, the AWS access key and AWS secret key can be passed using environment variables, named AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

Now, whenever a message is sent, the ssn field is automatically encrypted before serialization and decrypted after deserialization.

Step 5: Produce with Kafka serializers

To produce with Kafka serializers, add the appropriate serializer to the producer properties:

  • KafkaAvroSerializer
  • KafkaProtobufSerializer
  • KafkaJsonSchemaSerializer

Handle errors

If the Schema Registry client cannot decrypt the encrypted fields, it throws an error. You can configure the client to pass the encrypted data through in spite of the error, or you can send the records to a dead-letter queue (DLQ).

To pass encrypted data through

To pass encrypted data through in spite of an error, on the client specify an onFailure property in the rule. Here is an example:

{
  "schema": "...",
  "metadata": {...},
  "ruleSet": {
    "domainRules": [
      {
        "name": "encryptPII",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": ["PII"],
        "params": {
           "encrypt.kek.name": "<name of KEK>"
        },
        "onFailure": "ERROR,NONE"
      }
    ]
  }
}

The onFailure setting value of ERROR,NONE above includes two comma-separated values, the first for encryption and the second for decryption.

  • ERROR: If encryption fails, an error is thrown.
  • NONE: If decryption fails, the encrypted value is passed through without decryption.