Implement a Custom KMS Driver

Client-side field level encryption (CSFLE) in Confluent Cloud makes use of a Java library called Google Tink, a “secure-by-default” encryption library. To use a custom KMS with the CSFLE functionality in the Java client, three interfaces need to be implemented:

  • io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver
  • com.google.crypto.tink.KmsClient
  • com.google.crypto.tink.Aead

Example KMS driver

An example of a custom KMS driver is included below. The getKeyUrlPrefix should return a custom prefix, distinct from aws-kms://, azure-kms://, gcp-kms://, hcvault://, and local-kms://.

public class CustomKmsDriver implements KmsDriver {

  public CustomKmsDriver() {
  }

  @Override
  public String getKeyUrlPrefix() {
    return "custom-kms://";
  }

  @Override
  public KmsClient newKmsClient(Map<String, ?> configs, Optional<String> kekUrl)
      throws GeneralSecurityException {
    return new CustomKmsClient(configs, kekUrl);
  }
}

Custom KMS Client example

An example of a custom KMS client (KmsClient) is included below.

public final class CustomKmsClient implements KmsClient {

  public static final String PREFIX = "custom-kms://";

  private String keyUri;
  private Aead aead;

  public CustomKmsClient(Map<String, ?> configs, Optional<String> kekUrl)
      throws GeneralSecurityException {
    String uri = kekUrl.orElse(PREFIX);
    if (!uri.toLowerCase(Locale.US).startsWith(PREFIX)) {
      throw new IllegalArgumentException("key URI must start with " + PREFIX);
    }
    this.keyUri = uri;
    this.aead = new CustomKmsAead(configs, uri);
  }

  /**
   * @return true either if this client is a generic one and uri starts with
   *     {@link CustomKmsClient#PREFIX}, or the client is a specific one that is bound to the key
   *     identified by {@code uri}.
   */
  @Override
  public boolean doesSupport(String uri) {
    if (this.keyUri != null && this.keyUri.equals(uri)) {
      return true;
    }
    return this.keyUri == null && uri.toLowerCase(Locale.US).startsWith(PREFIX);
  }

  /**
   * Loads credentials from a properties file.
   *
   * @throws GeneralSecurityException if the client initialization fails
   */
  @Override
  public KmsClient withCredentials(String credentialPath) throws GeneralSecurityException {
      return this;
    }

    /**
     * Loads default credentials.
     *
     * @throws GeneralSecurityException if the client initialization fails
     */
    @Override
    public KmsClient withDefaultCredentials() throws GeneralSecurityException {
      return this;
    }

    @Override
    public Aead getAead(String uri) throws GeneralSecurityException {
      if (this.keyUri != null && !this.keyUri.equals(uri)) {
        throw new GeneralSecurityException(
            String.format(
                "this client is bound to %s, cannot load keys bound to %s", this.keyUri, uri));
      }

      return aead;
    }
  }

Example of a custom KMS driver using authenticated encryption

You can implement a custom KMS driver that uses authenticated encryption with associated data (AEAD), which is supported by the Tink library. AEAD encryption is a variant of authenticated encryption that encrypts and authenticates the plaintext, and also authenticates additional associated data (like headers) that is not encrypted, providing a guarantee that this associated data hasn’t been tampered with.

An example of a custom KMS driver that uses AEAD is included below. The CustomKmsAead class handles the core encryption and decryption logic. The CustomKmsDriver class handles the instantiation of the CustomKmsAead class.

public final class CustomKmsAead implements Aead {

  public CustomKmsAead(Map<String, ?> configs, String keyUrl) {
    // use configs and keyUrl to create a KMS client to the custom KMS
  }

  @Override
  public byte[] encrypt(final byte[] plaintext, final byte[] associatedData)
      throws GeneralSecurityException {
    try {
      // use the KMS client to encrypt the plaintext
    } catch (Exception e) {
      throw new GeneralSecurityException("encryption failed", e);
    }
  }

  @Override
  public byte[] decrypt(final byte[] ciphertext, final byte[] associatedData)
      throws GeneralSecurityException {
    try {
      // use the KMS client to decrypt the ciphertext
    } catch (Exception e) {
      throw new GeneralSecurityException("decryption failed", e);
    }
  }
}

In the resource directory META-INF/services, create a file named as follows:

io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver

The contents of this file should contain the fully-qualified name of the custom KmsDriver, such as the following:

com.mycompany.CustomKmsDriver

This tells the service loader what class to instantiate when the KmsDriver interface is requested.