Quick Start - Use CSFLE to Protect Sensitive Data

This quick start shows you how to use CSFLE to protect sensitive data in Confluent Cloud on AWS, Azure, and Google Cloud.

Requirements

Quick start using CSFLE

Here is an example of CSFLE using the Avro data format for each of the KMS providers.

Step 1 - Configure the KMS provider

Complete the steps for the KMS provider that you want to use.

  1. Configure AWS KMS. For more information, see AWS KMS.

  2. Add the environment variables for your AWS credentials to both the producer and consumer.

export AWS_ACCESS_KEY_ID=XXXX
export AWS_SECRET_ACCESS_KEY=XXXX
  1. Configure Azure Key Vault. For more information, see Azure Key Vault.

  2. Add the environment variables for your Azure credentials to both the producer and consumer.

export AZURE_TENANT_ID=XXXX
export AZURE_CLIENT_ID=XXXX
export AZURE_CLIENT_SECRET=XXXX
  1. Configure Google Cloud KMS. For more information, see Google Cloud KMS.

  2. Add the environment variables for your Google Cloud credentials to both the producer and consumer.

export GOOGLE_APPLICATION_CREDENTIALS=PATH_TO_CREDS.json
  1. Configure Hashicorp Vault. For more information, see Hashicorp Vault - Getting Started.

  2. Add the environment variables for your Hashicorp Vault credentials to both the producer and consumer.

    export VAULT_TOKEN=dev-only-token
    
  1. Create a local key.

  2. Add the environment variables for your local key credentials.

    export LOCAL_SECRET=<output of "openssl rand -base64 16">
    

Step 2 - Start the producer

To start the producer, run the kafka-avro-console-producer command for the KMS provider that you want to use, where <bootstrap-url> is the bootstrap URL for your Confluent Cloud cluster and <schema-registry-endpoint> is the URL for your Schema Registry instance.

Tip

If you specify encrypt.kms.key.id and encrypt.kms.type as shown in the examples, it will allow the KEK to be auto-registered. This is useful for quickstart purposes, but is not recommended for production deployments. For production, it is recommended that you create the KEK first and then only specify the encrypt.kek.name.

./bin/kafka-avro-console-producer --bootstrap-server <bootstrap-url> \
--property schema.registry.url=<schema-registry-endpoint> \
--topic test \  --producer.config config.properties \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=${SR_API_KEY}:${SR_API_SECRET} \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string","confluent:tags":["PII"]}]}' \
--property value.rule.set='{ "domainRules": [ { "name": "encryptPII", "type": "ENCRYPT", "tags":["PII"], "params": { "encrypt.kek.name": "aws-kek1", "encrypt.kms.key.id": "arn:aws:kms:us-east-1:xxxx:key/xxxx", "encrypt.kms.type": "aws-kms" }, "onFailure": "ERROR,NONE"}]}'
./bin/kafka-avro-console-producer --bootstrap-server <bootstrap-url> \
--property schema.registry.url=<schema-registry-endpoint> \
--topic test \
--producer.config config.properties \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=${SR_API_KEY}:${SR_API_SECRET} \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string","confluent:tags":["PII"]}]}' \
--property value.rule.set='{ "domainRules": [ { "name": "encryptPII", "type": "ENCRYPT", "tags":["PII"], "params": { "encrypt.kek.name": "azure-kek1", "encrypt.kms.key.id": "https://myvault.vault.azure.net/keys/mykey/xxxx", "encrypt.kms.type": "azure-kms" }, "onFailure": "ERROR,NONE"}]}'
./bin/kafka-avro-console-producer --bootstrap-server <bootstrap-url> \
--property schema.registry.url=<schema-registry-endpoint> \
--topic test \
--producer.config config.properties \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=${SR_API_KEY}:${SR_API_SECRET} \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string","confluent:tags":["PII"]}]}' \
--property value.rule.set='{ "domainRules": [ { "name": "encryptPII", "type": "ENCRYPT", "tags":["PII"], "params": { "encrypt.kek.name": "gcp-kek1", "encrypt.kms.key.id": "projects/myproject/locations/us-west1/keyRings/mykeyring/cryptoKeys/mykey", "encrypt.kms.type": "gcp-kms" }, "onFailure": "ERROR,NONE"}]}'
./bin/kafka-avro-console-producer --bootstrap-server <bootstrap-url> \
--property schema.registry.url=<schema-registry-endpoint> \
--topic test \
--producer.config config.properties \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=${SR_API_KEY}:${SR_API_SECRET} \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string","confluent:tags":["PII"]}]}' \
--property value.rule.set='{ "domainRules": [ { "name": "encryptPII", "type": "ENCRYPT", "tags":["PII"], "params": { "encrypt.kek.name": "hcvault-kek1", "encrypt.kms.key.id": "http://127.0.0.1:8200/transit/keys/my-key", "encrypt.kms.type": "hcvault" }, "onFailure": "ERROR,NONE"}]}'
./bin/kafka-avro-console-producer --bootstrap-server <bootstrap-url> \
--property schema.registry.url=<schema-registry-endpoint> \
--topic test \
--producer.config config.properties \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=${SR_API_KEY}:${SR_API_SECRET} \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string","confluent:tags":["PII"]}]}' \
--property value.rule.set='{ "domainRules": [ { "name": "encryptPII", "type": "ENCRYPT", "tags":["PII"], "params": { "encrypt.kek.name": "local-kek1", "encrypt.kms.key.id": "my-local-key", "encrypt.kms.type": "local-kms" }, "onFailure": "ERROR,NONE"}]}'

Step 3 - Start the consumer with decryption

To start the consumer with decryption, run the kafka-avro-console-consumer command for the KMS provider that you want to use, where <bootstrap-url> is the bootstrap URL for your Confluent Cloud cluster.

./bin/kafka-avro-console-consumer --bootstrap-server <bootstrap-url> \
  --topic test  \
  --property schema.registry.url=<schema-registry-endpoint> \
  --consumer.config config.properties

After you run the producer and consumer, you can verify that the data is encrypted and decrypted by using the kafka-configs --describe command for the topic.

kafka-configs --bootstrap-server <bootstrap-url> \
  --entity-type topics \
  --entity-name test \
  --describe

Example test record should look like this:

{"f1": "foo"}

{"f1": "foo", "f2": {"string": "bar"}}