Implement a Custom KMS Driver on Confluent Platform¶
The client-side field level encryption (CSFLE) functionality makes use of a library called Google Tink, a “secure-by-default” encryption library. In a CSFLE implementation, a key management service (KMS) securely stores and manages the encryption keys. The KMS is external to the database and provides a layer of security by separating key management from database management. When creating a custom KMS driver, do not encrypt fields that are used for filtering or sorting unless you have a compelling reason to do so.
To use a custom KMS with the CSFLE functionality in the Java client, three interfaces need to be implemented:
io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver
com.google.crypto.tink.KmsClient
com.google.crypto.tink.Aead
KMS driver¶
An example of a custom KmsDriver appears below. The getKeyUrlPrefix
method
should return a custom prefix, distinct from aws-kms://
, azure-kms://
,
gcp-kms://
, hcvault://
, and local-kms://
.
public class CustomKmsDriver implements KmsDriver {
public CustomKmsDriver() {
}
@Override
public String getKeyUrlPrefix() {
return "custom-kms://";
}
@Override
public KmsClient newKmsClient(Map<String, ?> configs, Optional<String> kekUrl)
throws GeneralSecurityException {
return new CustomKmsClient(configs, kekUrl);
}
}
The getAead
method should return a custom Aead
implementation.
public final class CustomKmsClient implements KmsClient {
public static final String PREFIX = "custom-kms://";
private String keyUri;
private Aead aead;
public CustomKmsClient(Map<String, ?> configs, Optional<String> kekUrl)
throws GeneralSecurityException {
String uri = kekUrl.orElse(PREFIX);
if (!uri.toLowerCase(Locale.US).startsWith(PREFIX)) {
throw new IllegalArgumentException("key URI must start with " + PREFIX);
}
this.keyUri = uri;
this.aead = new CustomKmsAead(configs, uri);
}
/**
* @return true either if this client is a generic one and uri starts with
* {@link CustomKmsClient#PREFIX}, or the client is a specific one that is bound to the key
* identified by {@code uri}.
*/
@Override
public boolean doesSupport(String uri) {
if (this.keyUri != null && this.keyUri.equals(uri)) {
return true;
}
return this.keyUri == null && uri.toLowerCase(Locale.US).startsWith(PREFIX);
}
/**
* Loads credentials from a properties file.
*
* @throws GeneralSecurityException if the client initialization fails
*/
@Override
public KmsClient withCredentials(String credentialPath) throws GeneralSecurityException {
return this;
}
/**
* Loads default credentials.
*
* @throws GeneralSecurityException if the client initialization fails
*/
@Override
public KmsClient withDefaultCredentials() throws GeneralSecurityException {
return this;
}
@Override
public Aead getAead(String uri) throws GeneralSecurityException {
if (this.keyUri != null && !this.keyUri.equals(uri)) {
throw new GeneralSecurityException(
String.format(
"this client is bound to %s, cannot load keys bound to %s", this.keyUri, uri));
}
return aead;
}
}
Custom Aead example¶
An example of a custom Aead appears below. You must implement the encrypt
and decrypt
methods to ensure data is encrypted and decrypted.
public final class CustomKmsAead implements Aead {
public CustomKmsAead(Map<String, ?> configs, String keyUrl) {
// use configs and keyUrl to create a KMS client to the custom KMS
}
@Override
public byte[] encrypt(final byte[] plaintext, final byte[] associatedData)
throws GeneralSecurityException {
try {
// use the KMS client to encrypt the plaintext
} catch (Exception e) {
throw new GeneralSecurityException("encryption failed", e);
}
}
@Override
public byte[] decrypt(final byte[] ciphertext, final byte[] associatedData)
throws GeneralSecurityException {
try {
// use the KMS client to decrypt the ciphertext
} catch (Exception e) {
throw new GeneralSecurityException("decryption failed", e);
}
}
}
The final step is to create a provider configuration file named
io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver
in the resource
directory META-INF/services
. The contents of the file should contain the
fully qualified name of the custom KmsDriver, such as the following:
com.mycompany.CustomKmsDriver
This tells the service loader what class to instantiate when the KmsDriver
interface is requested.