Manage CSFLE in Confluent Platform for Self-Managed Connectors

Client-Side Field Level Encryption (CSFLE) is a security feature that allows you to safeguard sensitive data, such as personally identifiable information (PII), by enabling field-level encryption both at the producer and consumer levels. By encrypting and decrypting individual fields within your data, CSFLE ensures that access to sensitive information is tightly controlled, granting only authorized stakeholders access to the data they are permitted to see. For more information, see Protect Sensitive Data Using CSFLE on Confluent Platform.

Important

For self-managed connectors on Confluent Platform, CSFLE is supported only on Confluent Enterprise 8.0 and later.

Limitations

CSFLE for self-managed connectors has the following limitations:

  • Auto registration of schemas is not supported when using CSFLE.
  • Schemas need to be manually registered upfront before creating connectors.
  • The connector only supports encryption for fields of type string or bytes.
  • The encryption does not work for string and bytes fields within nested JSON_SR formats when the value.converter.decimal.format is set to BASE64. To workaround this limitation, set value.converter.decimal.format to NUMERIC.
  • The reporter topics are not covered by CSFLE. Ensure that the error and success response do not contain any sensitive information while using reporter topics.

Supported connectors

The following table list the connector and its minimum version that support CSFLE.

Connector Minimum supported version
ActiveMQ Source 12.2.9
Amazon CloudWatch Metrics Sink 2.0.1
Amazon DynamoDB Sink 1.4.1
Amazon Kinesis Source 1.3.27
Amazon Redshift Sink 1.2.6
Amazon S3 Sink 10.6.0
Amazon S3 Source 2.6.10
Amazon SQS Source 2.0.3
Apache Kudu (Source and Sink) 1.0.5
AWS Lambda Sink 2.0.10
Azure Blob Storage Source 2.6.10
Azure Blob Storage Sink 1.6.27
Azure Data Lake Storage Gen2 Sink 1.6.27
Azure Cognitive Search Sink 1.1.7
Kafka Connect for Azure Cosmos DB (Source and Sink) 1.17.0
Azure Functions Sink 2.0.4
Azure Synapse Analytics Sink 1.0.9
Google BigQuery Sink 2.5.7
Cassandra Sink 2.0.10
Data Diode (Source and Sink) 1.2.6
Databricks Delta Lake Sink for AWS 1.0.19
Datadog Logs Sink 1.3.0
Debezium connector for MySQL 2.4.2
Debezium connector for PostgreSQL 2.5.4
Debezium connector for SQL Server 2.5.4
Elasticsearch Sink 14.1.2
Google Cloud Functions Sink 1.2.4
Google Cloud Storage Sink 10.2.1
Google Cloud Storage Source 10.2.1
GitHub Source 2.1.8
Google Cloud Pub/Sub Source 1.2.9
Google Cloud Spanner Sink 1.0.16
Google Firebase Realtime Database Connector (Source and Sink) 1.2.6
HDFS 3 Source 2.6.10
HDFS 3 Sink 1.2.4
HEAVY-AI (formerly OmniSci) Sink 1.0.9
HTTP Sink 1.7.8
HTTP Source 0.2.5
IBM MQ Sink 2.1.15
IBM MQ Source 12.2.9
InfluxDB Source 1.2.11
JDBC (Source and Sink) 10.8.2
Jira Source 1.2.13
JMS Sink Connector 2.1.15
JMS Source Connector 12.2.9
MongoDB Atlas Sink 1.15.0
Netezza Sink Connector 1.0.7
Oracle XStream CDC Source 1.1.0
PagerDuty Sink [Deprecated] 1.0.10
Redis Sink 0.0.8
Salesforce Bulk API (Source and Sink) 2.0.25
Salesforce (Source and Sink) 2.0.25
ServiceNow (Source and Sink) 2.5.4
SFTP (Source and Sink) 3.2.11
SNMP Trap Source 1.3.2
Snowflake Sink 3.1.1
Solace Source 1.2.8
Solace Sink 2.1.15
Splunk S2S Source 2.2.1
Splunk Source Connector 1.1.5
Kafka Connect Spooldir 2.0.67
Syslog Source 1.5.10
TIBCO EMS Sink 2.1.15
TIBCO EMS Source 1.2.9
Vertica Sink 1.3.2
VMware Tanzu Gemfire Sink 1.0.18
Zendesk Source Connector 1.3.4

Requirements

To use CSFLE in Confluent Platform with self-managed connectors, you must meet the following requirements:

  • An installation of Confluent Enterprise 8.0 and later with the CSFLE Add-On enabled.

  • Ensure Schema Registry is configured with the following properties before it starts:

    resource.extension.class=io.confluent.kafka.schemaregistry.rulehandler.RuleSetResourceExtension,io.confluent.dekregistry.DekRegistryResourceExtension
    confluent.license=<cpe-license-key>
    confluent.license.addon.csfle=<cpe-license-key>
    

    Note

    The value for confluent.license.addon.csfle is the same as your main confluent.license key.

  • An external KMS to manage your Key Encryption Keys (KEKs). For more information, see Manage KEKs.

  • The KMS provider must be configured for the connector.

  • A Kafka topic to use as a data source or destination.

Manage CSFLE

At a high level, you can manage CSFLE for self-managed connectors using the following 2-step process:

  1. Configure CSFLE in Confluent Platform
  2. Enable CSFLE in connectors

Configure CSFLE in Confluent Platform

CSFLE is available in Confluent Enterprise 8.0 and later to help you protect sensitive data in your Confluent Platform account and perform stream processing on encrypted data. You must configure CSFLE in Confluent Platform before you modify an existing self-managed connector or create a new one with CSFLE enabled.

Complete the following configuration steps:

  • Define the schema for the topic and add tags to the fields in the schema that you want to encrypt. For more information, see add tags to the schema fields.

  • Define an encryption policy that specifies rules to use to encrypt the tags. For more information, see add encryption rules

  • Grant DeveloperWrite permission for encryption key.

  • Add the following parameters in the connector configuration:

    For AWS, pass the following configuration parameters:

    Parameter Description
    rule.executors._default_.param.access.key.id=? The AWS access key identifier.
    rule.executors._default_.param.secret.access.key=? The AWS secret access key.

For more information, see Configure CSFLE for use with Confluent Enterprise.

Enable CSFLE in connectors

To enable CSFLE for connectors, define the following parameters with the mentioned boolean values in the connector configuration:

Note

If you do not add these values in the connector configuration, CSFLE might not work properly.

  • csfle.enabled=true
  • value.converter.auto.register.schemas=false
  • value.converter.use.latest.version=true
  • key.converter.auto.register.schemas=false
  • key.converter.use.latest.version=true

Note

  • To fetch the latest value schema from schema registry, use value.converter.latest.cache.ttl.sec, that allows you to define the time interval, in seconds, after which the connector fetches the latest version of the value schema. By default, its value is set to -1. To enable it, enter the desired time interval in seconds for this parameter.
  • Similar to the value schema, use key.converter.latest.cache.ttl.sec to define the time interval, in seconds, after which the converter fetches the latest key schema from schema registry. The default value is -1. Change this value to the desired time interval in seconds.