Use Client-Side Field Level Encryption in Confluent Cloud¶
For steps on using Confluent Cloud Console to configure client-side field level encryption, see Manage Client-Side Field Level Encryption using Confluent Cloud Console.
Client-side field level encryption (CSFLE) uses a technique called envelope encryption, where a key encryption key (KEK) is used to encrypt data encryption keys (DEKs), which are the actual keys used to encrypt fields. The KEK is typically a key from an external Key Management System (KMS), such as AWS KMS, Azure Key Vault, or Google Cloud KMS.
Schema Registry exposes a subcomponent called the DEK Registry, which provides APIs for managing KEKs and DEKs. The key from the KMS is registered as a KEK to the DEK Registry, and the DEK Registry will also hold the encrypted DEKs used for CSFLE. DEKs are scoped by subject.
In Confluent Cloud, for a given KEK, the DEK Registry can optionally be given permission to make direct calls to the KMS in order to encrypt and decrypt DEKs. The main advantage of granting this permission is to enable other Confluent Cloud offerings, such as Flink and ksqlDB, to process data that was previously encrypted by CSFLE.
If permission is granted, then the DEK Registry will generate the DEK, encrypt it, and make the decrypted DEK available to clients that have the proper authorization. If permission is not granted, then the client will typically generate the DEK, encrypt it, and store it to the DEK Registry.
At a high level, CSFLE requires the following steps:
- Create a KEK in an external KMS.
- Register the KEK to the DEK Registry.
- Specify the tags on fields.
- Declare encryption rules that specify which tags should be encrypted.
- Authorize the DEK Registry to access the KMS in Confluent Cloud, or configure the client to access the KMS.
- Produce and consume data.
Requirements¶
To use client-side field level encryption (CSFLE) in Confluent Cloud, the following requirements must be met.
Stream Governance Advanced package¶
- Stream Governance Advanced package must be enabled.
- For more information, see Data Contracts for Schema Registry on Confluent Cloud.
Clients¶
Supported clients for use with CSFLE include:
Confluent Platform or Apache Kafka® Java Client¶
Supported versions include:
- Confluent Platform 7.4.5 or later (Kafka 3.4.5 or later)
- Confluent Platform 7.5.4 or later (Kafka 3.5.5 or later)
- Confluent Platform 7.6.1 or later (Kafka 3.6.1 or later)
For details, see Apache Kafka Java Client.
Confluent Go Client¶
- Requires version 2.5.0 or later
- For details, see Confluent Golang Client for Apache Kafka.
- For examples, see confluent-kafka-go/examples.
Confluent .NET Client¶
- Requires version 2.5.0 or later
- For details, see Confluent .NET Client for Apache Kafka.
- For examples, see confluent-kafka-dotnet/examples.
Confluent Node.js Client for Schema Registry (Early Access)¶
Note
Confluent Node.js Client support for CSFLE and OAuth is an Early Access Program feature in Confluent Cloud. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions. To participate in this Early Access Program, contact your Confluent account manager.
Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.
- Requires version 0.2.1 or later.
- For details, see Confluent Node.js Client.
- GitHub repository: confluent-kafka-javascript.
Kafka serializers and deserializers¶
CSFLE works with the latest versions of the existing Java serializers and deserializers for:
- Avro. For more information, see Avro Schema Serializer and Deserializer for Schema Registry on Confluent Cloud.
- JSON Schema. For more information, see JSON Schema Serializer and Deserializer for Schema Registry on Confluent Cloud.
- Protobuf. For more information, see Protobuf Schema Serializer and Deserializer for Schema Registry on Confluent Cloud.
Key management service (KMS)¶
CSFLE supports the following key management services (KMS):
- AWS Key Management Service (AWS KMS)
- Azure Key Vault
- Google Cloud Key Management Service (Cloud KMS)
- Hashicorp Vault Transit Secrets Engine. Applies only when the KEK is not shared with Confluent.
- Local key (for testing only)
Confluent Cloud resources¶
For Confluent Cloud resources that are supported for CSFLE, see:
Configure client-side field level encryption¶
The following steps show how to configure client-side field level encryption for use with Confluent Cloud.
Step 1: Create a KEK in your key management service (KMS)¶
To start using CSFLE, the first step is to create a KEK using your key management service (KMS).
For the steps to create a KEK, see Create a KEK.
Step 2: Register the KEK with the DEK Registry¶
After you have a KEK, you can register it with the DEK Registry using Confluent Cloud Console, Confluent CLI, or REST APIs.
Note that if the KEK is not registered beforehand, you can register it on demand by the client, assuming that the client has the appropriate permissions with the DEK Registry.
For the steps to register a KEK with the DEK Registry, see Register a KEK with the DEK Registry.
Step 3: Add tags to the schema fields¶
Tags are metadata attributes that you can add to schema fields. You can use tags to specify which fields are encrypted. These tags are used to define the encryption policies that specify which fields in the data schema need to be encrypted and how they should be handled.
Important
Before using the tags, you must add the tag definitions in Stream Catalog.
Tags can either be inline or external. Here is an example of how tags can be added to schema fields in a JSON schema:
{
"type":"record",
"name":"MyRecord",
"fields":[{
"name":"ssn",
"type":"string",
"confluent:tags": [ "PII", "PRIVATE" ]
}]
}
In this example, the Social Security Number (ssn
) field includes PII
and PRIVATE
to indicate that the field includes personally identifiable information
(PII) that is marked as private.
Step 3 - Define an encryption policy¶
After adding the tags, you need to define an encryption policy that specifies rules for which tags use for encryption. The encryption policy is defined in a JSON file that is then uploaded to Confluent Cloud. Here is an example of an encryption policy:
{
"schema": "...",
"metadata": {...},
"ruleSet": {
"domainRules": [
{
"name": "encryptPII",
"kind": "TRANSFORM",
"type": "ENCRYPT",
"mode": "WRITEREAD",
"tags": ["PII"],
"params": {
"encrypt.kek.name": "<kekName>"
}
}
]
}
}
Note that you specified the name of the KEK in step 1. If the KEK has not yet been registered, you can optionally specify the KMS key ID and KMS type in the rule. The client automatically registers the KEK before registering any DEKs.
{
"schema": "...",
"metadata": {...},
"ruleSet": {
"domainRules": [
{
"name": "encryptPII",
"kind": "TRANSFORM",
"type": "ENCRYPT",
"mode": "WRITEREAD",
"tags": ["PII"],
"params": {
"encrypt.kek.name": "<kekName>",
"encrypt.kms.key.id": "<kmsKeyId>",
"encrypt.kms.type": "aws-kms"
}
}
]
}
}
During registration, if the schema is omitted, then the ruleset attaches to the latest schema in the subject.
After registration, you need to include the following in the client:
auto.register.schemas=false
use.latest.version=true
If you do not include these properties, the client attempts to register, or look up, a schema without any existing rules.
Encryption is supported for fields of type string
or bytes
. Type integer
is currently not supported. If a client does not have a rule executor for the
ENCRYPT
rule type and attempts to consume a message, then the message is not
decrypted and the client receives the encrypted values.
Two additional properties can be specified for DEKs in the rule parameters:
Parameter | Description |
---|---|
encrypt.dek.algorithm | The encryption algorithm being used. Valid values include AES128_GCM ,
AES256_GCM (default), or AES256_SIV . You can use AES265_SIV
for deterministic encryption of keys. |
encrypt.dek.expiry.days | If specified, automatic DEK rotation occurs. When a DEK is older than the expiration period, a new DEK is generated and used for new messages, while previous DEKs are available to decrypt older messages. There is a limit to the number of existing DEKs (10,000) that can be retained in the DEK Registry, so use this property cautiously. |
preserve.source.fields | For performance reasons, the fields of a message are updated during field-level
transforms. For field-level encryption, this results in the field values
being replaced with the encrypted field values. If the original field values
should be retained in the message, then set this property to true . |
Step 4: Configure the KMS key encryption key¶
For Confluent Cloud, the DEK Registry can be granted permission to access your KMS.
To provide proper access for the DEK Registry, obtain the KMS key policy that needs to be added to the key, either through the Confluent Cloud Console or by using the following REST endpoint:
https://psrc-xxxxx.<region>.<provider>.confluent.cloud/dek-registry/v1/policy
If the DEK Registry has not been granted permission to access the KMS, then the credentials to access the KMS must be specified on the client.
For each of the supported KMS providers, the following dependencies are required:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client-encryption-aws</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client-encryption-azure</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client-encryption-gcp</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client-encryption-hcvault</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client-encryption</artifactId>
<version>7.6.0</version>
</dependency>
Next, you need to configure the following parameters on the clients.
rule.executors._default_.param.access.key.id=<AWS access key>
rule.executors._default_.param.secret.access.key=<AWS secret key>
Alternatively, the AWS access key and AWS secret key can be passed
using environment variables, named AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
.
Now, whenever a message is sent, the ssn
field is automatically encrypted
before serialization and decrypted after deserialization.
Step 5: Produce with Kafka serializers¶
To produce with Kafka serializers, add the appropriate serializer to the producer properties:
- KafkaAvroSerializer
- KafkaProtobufSerializer
- KafkaJsonSchemaSerializer
Handle errors¶
If the Schema Registry client cannot decrypt the encrypted fields, it throws an error. You can configure the client to pass the encrypted data through in spite of the error, or you can send the records to a dead-letter queue (DLQ).
To pass encrypted data through¶
To pass encrypted data through in spite of an error, on the client specify
an onFailure
property in the rule. Here is an example:
{
"schema": "...",
"metadata": {...},
"ruleSet": {
"domainRules": [
{
"name": "encryptPII",
"kind": "TRANSFORM",
"type": "ENCRYPT",
"mode": "WRITEREAD",
"tags": ["PII"],
"params": {
"encrypt.kek.name": "<name of KEK>"
},
"onFailure": "ERROR,NONE"
}
]
}
}
The onFailure
setting value of ERROR,NONE
above includes two comma-separated
values, the first for encryption and the second for decryption.
ERROR
: If encryption fails, an error is thrown.NONE
: If decryption fails, the encrypted value is passed through without decryption.