Protect Sensitive Data Using Client-Side Payload Encryption on Confluent Cloud

Use Client-Side Payload Encryption (CSPE) to easily encrypt entire messages at the payload or message level. This feature provides comprehensive security assurance without the operational burden of field-by-field encryption.

Features:

  • Encrypt the entire message payload with a single rule, reducing the operational complexity and risk of exposing sensitive data.

  • Meet stringent compliance requirements by ensuring all data is encrypted by default, minimizing data classification burdens.

  • Maintain uniform encryption practices across your platforms.

  • Bring all of your data, even the “all sensitive” payloads without clear schemas, into Confluent Cloud.

With CSPE you can define an encryption rule at the schema level to encrypt the entire message payload.

CSPE is enabled by specifying an encoding rule. Domain rules are run before serialization and after deserialization. Encoding rules are run after serialization and before deserialization.

Use CSPE

Prerequisite:

Stream Governance Advanced package must be enabled.

  1. Create a KEK using your key management service (KMS).

    For the steps to create a KEK, see Create a KEK.

  2. Register the KEK with the DEK Registry using Confluent Cloud Console, Confluent CLI, or REST APIs.

    Note that if the KEK is not registered beforehand, you can register it on demand by the client, assuming that the client has the appropriate permissions with the DEK Registry.

    For the steps to register a KEK with the DEK Registry, see Register a KEK with the DEK Registry.

  3. Define an encryption policy that specifies rules for which tags use for encryption. The encryption policy is defined in a JSON file that is then uploaded to Confluent Cloud. Here is an example of an encryption policy:

    {
      "ruleSet": {
        "encodingRules": [
          {
            "name": "encryptPayload",
            "kind": "TRANSFORM",
            "type": "ENCRYPT_PAYLOAD",
            "mode": "WRITEREAD",
            "params": {
              "encrypt.kek.name": "<kek-name>"
            },
            "onFailure": "ERROR,NONE"
          }
        ]
      }
    }
    

    Note that you specified the name of the KEK in step 1. If the KEK has not yet been registered, you can optionally specify the KMS key ID and KMS type in the rule. The client automatically registers the KEK before registering any DEKs.

    {
    "schema": "...",
    "metadata": {...},
    "ruleSet": {
        "domainRules": [
          {
          "name": "encryptPayload",
          "kind": "TRANSFORM",
          "type": "ENCRYPT_PAYLOAD",
          "mode": "WRITEREAD",
          "tags": ["PII"],
          "params": {
              "encrypt.kek.name": "<kekName>",
              "encrypt.kms.key.id": "<kmsKeyId>",
              "encrypt.kms.type": "<kmsType>"
          }
          }
        ]
    }
    }
    

    During registration, if the schema is omitted, then the ruleset attaches to the latest schema in the subject.

    After registration, you need to include the following in the client:

    • auto.register.schemas=false

    • use.latest.version=true

    If you do not include these properties, the client attempts to register, or look up, a schema without any existing rules.

    You can specify these properties for DEKs in the rule parameters:

    encrypt.dek.algorithm

    The encryption algorithm being used. Valid values include AES128_GCM, AES256_GCM (default), or AES256_SIV.

    encrypt.dek.expiry.days

    If specified, automatic DEK rotation occurs. When a DEK is older than the expiration period, a new DEK is generated and used for new messages, while previous DEKs are available to decrypt older messages. There is a limit to the number of existing DEKs (10,000) that can be retained in the DEK Registry, so use this property cautiously.

  4. Configure the KMS key encryption key. For Confluent Cloud, the DEK Registry can be granted permission to access your KMS.

    To provide proper access for the DEK Registry, obtain the KMS key policy that needs to be added to the key, either through the Confluent Cloud Console or by using the following REST endpoint:

    https://psrc-xxxxx.<region>.<provider>.confluent.cloud/dek-registry/v1/policy
    

    If the DEK Registry has not been granted permission to access the KMS, then the credentials to access the KMS must be specified on the client.

    For each of the supported KMS providers, the following dependencies are required, where <cp-version> is your Confluent Platform version:

    <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-schema-registry-client-encryption-aws</artifactId>
       <version><cp-version></version>
    </dependency>
    
    <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-schema-registry-client-encryption-azure</artifactId>
       <version><cp-version></version>
    </dependency>
    
    <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-schema-registry-client-encryption-gcp</artifactId>
       <version><cp-version></version>
    </dependency>
    
    <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-schema-registry-client-encryption-hcvault</artifactId>
       <version><cp-version></version>
    </dependency>
    
    <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-schema-registry-client-encryption</artifactId>
       <version><cp-version></version>
    </dependency>
    

    Next, you need to configure the following parameters on the clients.

    rule.executors._default_.param.access.key.id=<AWS access key>
    rule.executors._default_.param.secret.access.key=<AWS secret key>
    

    Alternatively, the AWS access key and AWS secret key can be passed using environment variables, named AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

    rule.executors._default_.param.tenant.id=<tenant ID>
    rule.executors._default_.param.client.id=<client ID>
    rule.executors._default_.param.client.secret=<secret value>
    
    rule.executors._default_.param.client.id=<GCP Client ID>
    rule.executors._default_.param.client.email=<GCP Client Email>
    rule.executors._default_.param.private.key.id=<GCP Private Key ID>
    rule.executors._default_.param.private.key=<GCP Private Key Material>
    
    rule.executors._default_.param.token.id=root-token
    
    rule.executors._default_.param.secret=<output of "openssl rand -base64 16">
    

    Now, whenever a message is sent, the ssn field is automatically encrypted before serialization and decrypted after deserialization.

  5. Produce with Kafka serializers. Add the appropriate serializer to the producer properties:

    • KafkaAvroSerializer

    • KafkaProtobufSerializer

    • KafkaJsonSchemaSerializer

Example

Here is an example of a rule set that contains a CSPE rule. Note that CSPE uses the ENCRYPT_PAYLOAD rule type, rather than ENCRYPT rule type which CSFLE uses.

{
"ruleSet": {
    "encodingRules": [
    {
        "name": "encryptPayload",
        "kind": "TRANSFORM",
        "type": "ENCRYPT_PAYLOAD",
        "mode": "WRITEREAD",
        "params": {
        "encrypt.kek.name": "local-kek1",
        "encrypt.kms.key.id": "mykey",
        "encrypt.kms.type": "local-kms"
        },
        "onFailure": "ERROR,NONE"
    }
    ]
}
}