Protect Sensitive Data Using Client-Side Payload Encryption on Confluent Platform
Use Client-Side Payload Encryption (CSPE) to easily encrypt entire messages at the payload or message level. This feature provides comprehensive security assurance without the operational burden of field-by-field encryption.
Important
CSPE is currently offered to a limited set of customers through a private Limited Availability (LA) program.
- Features
Encrypt the entire message payload with a single rule, reducing the operational complexity and risk of exposing sensitive data.
Meet stringent compliance requirements by ensuring all data is encrypted by default, minimizing data classification burdens.
Maintain uniform encryption practices across your platforms.
Bring all of your data, even the “all sensitive” payloads without clear schemas, into Confluent Platform.
With CSPE you can define an encryption rule at the schema level to encrypt the entire message payload.
CSPE is enabled by specifying an encoding rule. Domain rules are run before serialization and after deserialization. Encoding rules are run after serialization and before deserialization.
- Envelope encryption
CSPE and Client-Side Field Level Encryption use envelope encryption.
A Data Encryption Key (DEK) is used to encrypt the plaintext data.
A Key Encryption Key (KEK) is used to encrypt the DEK. The KEK is required to decrypt the data.
- Key ownership and management
Key Encryption Keys (KEKs): Users retain ownership and management of their KEKs (master keys) in their external Key Management System (KMS). Confluent interacts with the KEK via API calls for encryption/decryption of the DEK but never directly accesses or persists the KEKs. Customers can rotate KEKs through their KMS.
Data Encryption Keys (DEKs): DEKs are generated and managed by Confluent, scoped by subject, and stored as encrypted DEKs within the Schema Registry instance.
- KEK access options and impact on managed services
You must decide whether to grant Confluent access to your KEK. This decision determines the functionality available from fully managed services.
No Confluent access to KEK
Access level: No user or application in Confluent Cloud can access the encrypted data in plaintext.
Service limitation: Managed services like Flink or Connect cannot access the unencrypted data. Stream processing on encrypted fields is not possible.
Client complexity: Users must configure producers and consumers to connect directly to the KMS to access KEKs.
Confluent granted access to KEK
Managed services enabled: Sharing the KEK means Confluent can facilitate encrypt and decrypt operations.
Future functionality: New features automatically enabled when the KEK is shared with Confluent.
Simpler client setup: Less configuration is required for producers and consumers.
- Availability and packaging
CSPE is an entitlement available only as part of the package.
Client support: CSPE supports Avro, Protobuf, and JSON Schema formats. Supported client languages include Java, C#/.NET, Go, Python, JavaScript, and C++.
Configuration interface: Deployment is available through API with CLI and Terraform.
CSPE Quick start
Create a KEK using your key management service (KMS).
For the steps to create a KEK, see Create a KEK.
Register the KEK with the DEK Registry using Confluent Cloud Console, Confluent CLI, or REST APIs.
Note that if the KEK is not registered beforehand, you can register it on demand by the client, assuming that the client has the appropriate permissions with the DEK Registry.
For the steps to register a KEK with the DEK Registry, see Register a KEK with the DEK Registry.
Define an encryption policy that specifies rules for which tags use for encryption. The encryption policy is defined in a JSON file that is then uploaded to Confluent Platform. Here is an example of an encryption policy:
{ "ruleSet": { "encodingRules": [ { "name": "encryptPayload", "kind": "TRANSFORM", "type": "ENCRYPT_PAYLOAD", "mode": "WRITEREAD", "params": { "encrypt.kek.name": "<kek-name>" }, "onFailure": "ERROR,NONE" } ] } }
Note that you specified the name of the KEK in step 1. If the KEK has not yet been registered, you can optionally specify the KMS key ID and KMS type in the rule. The client automatically registers the KEK before registering any DEKs.
{ "schema": "...", "metadata": {...}, "ruleSet": { "encodingRules": [ { "name": "encryptPayload", "kind": "TRANSFORM", "type": "ENCRYPT_PAYLOAD", "mode": "WRITEREAD", "tags": ["PII"], "params": { "encrypt.kek.name": "<kekName>", "encrypt.kms.key.id": "<kmsKeyId>", "encrypt.kms.type": "<kmsType>" } } ] } }
During registration, if the schema is omitted, then the ruleset attaches to the latest schema in the subject.
After registration, you need to include the following in the client:
auto.register.schemas=falseuse.latest.version=true
If you do not include these properties, the client attempts to register, or look up, a schema without any existing rules.
You can specify these properties for DEKs in the rule parameters:
encrypt.dek.algorithmThe encryption algorithm being used. Valid values include
AES128_GCM,AES256_GCM(default), orAES256_SIV.encrypt.dek.expiry.daysIf specified, automatic DEK rotation occurs. When a DEK is older than the expiration period, a new DEK is generated and used for new messages, while previous DEKs are available to decrypt older messages. There is a limit to the number of existing DEKs (10,000) that can be retained in the DEK Registry, so use this property cautiously.
Configure the KMS key encryption key. For Confluent Platform, the DEK Registry can be granted permission to access your KMS.
To provide proper access for the DEK Registry, obtain the KMS key policy that needs to be added to the key, either through the Confluent Cloud Console or by using the following REST endpoint:
https://psrc-xxxxx.<region>.<provider>.confluent.cloud/dek-registry/v1/policy
If the DEK Registry has not been granted permission to access the KMS, then the credentials to access the KMS must be specified on the client.
For each of the supported KMS providers, the following dependencies are required, where
<cp-version>is your Confluent Platform version:<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client-encryption-aws</artifactId> <version><cp-version></version> </dependency>
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client-encryption-azure</artifactId> <version><cp-version></version> </dependency>
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client-encryption-gcp</artifactId> <version><cp-version></version> </dependency>
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client-encryption-hcvault</artifactId> <version><cp-version></version> </dependency>
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client-encryption</artifactId> <version><cp-version></version> </dependency>
Next, you need to configure the following parameters on the clients.
rule.executors._default_.param.access.key.id=<AWS access key> rule.executors._default_.param.secret.access.key=<AWS secret key>
Alternatively, the AWS access key and AWS secret key can be passed using environment variables, named
AWS_ACCESS_KEY_IDandAWS_SECRET_ACCESS_KEY.rule.executors._default_.param.tenant.id=<tenant ID> rule.executors._default_.param.client.id=<client ID> rule.executors._default_.param.client.secret=<secret value>
rule.executors._default_.param.client.id=<GCP Client ID> rule.executors._default_.param.client.email=<GCP Client Email> rule.executors._default_.param.private.key.id=<GCP Private Key ID> rule.executors._default_.param.private.key=<GCP Private Key Material>
rule.executors._default_.param.token.id=root-token
rule.executors._default_.param.secret=<output of "openssl rand -base64 16">
Now, whenever a message is sent, the
ssnfield is automatically encrypted before serialization and decrypted after deserialization.Produce with Kafka serializers. Add the appropriate serializer to the producer properties:
KafkaAvroSerializer
KafkaProtobufSerializer
KafkaJsonSchemaSerializer
Example
Here is an example of a rule set that contains a CSPE rule. Note that CSPE uses the ENCRYPT_PAYLOAD rule type, rather than ENCRYPT rule type which CSFLE uses.
{
"ruleSet": {
"encodingRules": [
{
"name": "encryptPayload",
"kind": "TRANSFORM",
"type": "ENCRYPT_PAYLOAD",
"mode": "WRITEREAD",
"params": {
"encrypt.kek.name": "local-kek1",
"encrypt.kms.key.id": "mykey",
"encrypt.kms.type": "local-kms"
},
"onFailure": "ERROR,NONE"
}
]
}
}