Quick Start - Protect Sensitive Data on Confluent Platform with CSFLE

This quick start shows you how to use CSFLE to protect sensitive data on Confluent Platform.

Requirements

Before you begin, ensure you have:

  • A running Confluent Enterprise cluster with:
    • A topic named test (or your chosen topic name)
    • Schema Registry configured and running
    • The necessary credentials to access your cluster:
      • Bootstrap server URL
      • Schema Registry URL
      • Schema Registry API key and secret
  • Access to the Confluent CLI from a terminal where you’ll run the commands.
  • The required configuration file config.properties in your working directory.

Quick start using CSFLE

In this example you use CSFLE with the Avro data format for each of the key management service (KMS) providers. A KMS is a service that securely creates, stores, and manages the encryption keys used to protect sensitive data. It acts as a trusted source for encryption/decryption keys, allowing applications to encrypt data before sending it to Confluent Enterprise and decrypt it when reading.

Step 1 - Configure the KMS provider

Complete the steps for the KMS provider that you want to use.

  1. Configure AWS KMS. For more information, see AWS KMS.
  2. Add the environment variables for your AWS credentials to both the producer and consumer.
export AWS_ACCESS_KEY_ID=XXXX
export AWS_SECRET_ACCESS_KEY=XXXX

Step 2 - Start the producer

To start the producer, run the kafka-avro-console-producer command for the KMS provider that you want to use, where <bootstrap-url> is the bootstrap URL for your Confluent Platform cluster and <schema-registry-url> is the URL for your Schema Registry instance.

./bin/kafka-avro-console-producer --bootstrap-server <bootstrap-url> \
  --property schema.registry.url=<schema-registry-url> \
  --topic test  \
  --producer.config config.properties \
  --property basic.auth.credentials.source=USER_INFO \
  --property basic.auth.user.info=${SR_API_KEY}:${SR_API_SECRET} \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string","confluent:tags":["PII"]}]}' \
  --property value.rule.set='{ "domainRules": [ { "name": "encryptPII", "type": "ENCRYPT", "tags":["PII"], "params": { "encrypt.kek.name": "aws-kek1", "encrypt.kms.key.id": "arn:aws:kms:us-east-1:xxxx:key/xxxx", "encrypt.kms.type": "aws-kms" }, "onFailure": "ERROR,NONE"}]}'

Step 3 - Start the consumer with decryption

To start the consumer with decryption, run the kafka-avro-console-consumer command for the KMS provider that you want to use, where <bootstrap-url> is the bootstrap URL for your Confluent Platform cluster.

./bin/kafka-avro-console-consumer --bootstrap-server <bootstrap-url> \
  --topic test  \
  --property schema.registry.url=<schema-registry-url> \
  --consumer.config config.properties

After you run the producer and consumer, you can verify that the data is encrypted and decrypted by using the kafka-configs --describe command for the topic.

kafka-configs --bootstrap-server <bootstrap-url> \
  --entity-type topics \
  --entity-name test \
  --describe

Example test record should look like this:

{"f1": "foo"}

{"f1": "foo", "f2": {"string": "bar"}}