Quick Start - Use CSFLE to Protect Sensitive Data

This quick start shows you how to use CSFLE to protect sensitive data in Confluent Cloud on AWS, Azure, and Google Cloud.


Quick start using CSFLE

Here is an example of CSFLE using the Avro data format for each of the KMS providers.

Step 1 - Configure the KMS provider

Complete the steps for the KMS provider that you want to use.

  1. Configure AWS KMS. For more information, see AWS KMS.
  2. Add the environment variables for your AWS credentials to both the producer and consumer.

Step 2 - Start the producer

To start the producer, run the kafka-avro-console-producer command for the KMS provider that you want to use, where <bootstrap-url> is the bootstrap URL for your Confluent Cloud cluster and <schema-registry-url> is the URL for your Schema Registry instance.

./bin/kafka-avro-console-producer --bootstrap-server <bootstrap-url> \
  --property schema.registry.url=<schema-registry-url> \
  --topic test  \
  --producer.config config.properties \
  --property basic.auth.credentials.source=USER_INFO \
  --property basic.auth.user.info=${SR_API_KEY}:${SR_API_SECRET} \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string","confluent:tags":["PII"]}]}' \
  --property value.rule.set='{ "domainRules": [ { "name": "encryptPII", "type": "ENCRYPT", "tags":["PII"], "params": { "encrypt.kek.name": "aws-kek1", "encrypt.kms.key.id": "arn:aws:kms:us-east-1:xxxx:key/xxxx", "encrypt.kms.type": "aws-kms" }, "onFailure": "ERROR,NONE"}]}'

Step 3 - Start the consumer with decryption

To start the consumer with decryption, run the kafka-avro-console-consumer command for the KMS provider that you want to use, where <bootstrap-url> is the bootstrap URL for your Confluent Cloud cluster.

./bin/kafka-avro-console-consumer --bootstrap-server <bootstrap-url> \
  --topic test  \
  --property schema.registry.url=<schema-registry-url> \
  --consumer.config config.properties

After you run the producer and consumer, you can verify that the data is encrypted and decrypted by using the kafka-configs --describe command for the topic.

kafka-configs --bootstrap-server <bootstrap-url> \
  --entity-type topics \
  --entity-name test \

Example test record should look like this:

{"f1": "foo"}

{"f1": "foo", "f2": {"string": "bar"}}