Implement a Custom KMS Driver on Confluent Platform
This page shows how to implement a custom KMS driver by providing the required
interfaces (KmsDriver, KmsClient, and Aead) and registering it using
Java’s service loader (META-INF/services).
The client-side field level encryption (CSFLE) functionality uses Google Tink, a “secure-by-default” cryptography library. In a CSFLE deployment, a key management service (KMS) securely stores and manages encryption keys. The KMS is external to Confluent Platform and your Apache Kafka® clusters, providing separation of duties between key management and data processing.
To use a custom KMS with the CSFLE functionality in the Java client, three interfaces need to be implemented.
io.confluent.kafka.schemaregistry.encryption.tink.KmsDrivercom.google.crypto.tink.KmsClientcom.google.crypto.tink.Aead
KMS driver
An example of a custom KmsDriver appears below.  The getKeyUrlPrefix method
should return a custom prefix, distinct from aws-kms://, azure-kms://,
gcp-kms://, hcvault://, and local-kms://.
public class CustomKmsDriver implements KmsDriver {
  public CustomKmsDriver() {
  }
  @Override
  public String getKeyUrlPrefix() {
    return "custom-kms://";
  }
  @Override
  public KmsClient newKmsClient(Map<String, ?> configs, Optional<String> kekUrl)
      throws GeneralSecurityException {
    return new CustomKmsClient(configs, kekUrl);
  }
}
The getAead method should return a custom Aead implementation.
public final class CustomKmsClient implements KmsClient {
  public static final String PREFIX = "custom-kms://";
  private String keyUri;
  private Aead aead;
  public CustomKmsClient(Map<String, ?> configs, Optional<String> kekUrl)
      throws GeneralSecurityException {
    String uri = kekUrl.orElse(PREFIX);
    if (!uri.toLowerCase(Locale.US).startsWith(PREFIX)) {
      throw new IllegalArgumentException("key URI must start with " + PREFIX);
    }
    this.keyUri = uri;
    this.aead = new CustomKmsAead(configs, uri);
  }
  /**
   * @return true either if this client is a generic one and uri starts with
   *     {@link CustomKmsClient#PREFIX}, or the client is a specific one that is bound to the key
   *     identified by {@code uri}.
   */
  @Override
  public boolean doesSupport(String uri) {
    if (this.keyUri != null && this.keyUri.equals(uri)) {
      return true;
    }
    return this.keyUri == null && uri.toLowerCase(Locale.US).startsWith(PREFIX);
  }
  /**
   * Loads credentials from a properties file.
   *
   * @throws GeneralSecurityException if the client initialization fails
   */
  @Override
  public KmsClient withCredentials(String credentialPath) throws GeneralSecurityException {
    return this;
  }
  /**
   * Loads default credentials.
   *
   * @throws GeneralSecurityException if the client initialization fails
   */
  @Override
  public KmsClient withDefaultCredentials() throws GeneralSecurityException {
    return this;
  }
  @Override
  public Aead getAead(String uri) throws GeneralSecurityException {
    if (this.keyUri != null && !this.keyUri.equals(uri)) {
      throw new GeneralSecurityException(
          String.format(
              "this client is bound to %s, cannot load keys bound to %s", this.keyUri, uri));
    }
    return aead;
  }
}
Custom Aead example
An example of a custom Aead appears below.  You must implement the encrypt and decrypt
methods to ensure data is encrypted and decrypted.
public final class CustomKmsAead implements Aead {
  public CustomKmsAead(Map<String, ?> configs, String keyUrl) {
    // use configs and keyUrl to create a KMS client to the custom KMS
  }
  @Override
  public byte[] encrypt(final byte[] plaintext, final byte[] associatedData)
      throws GeneralSecurityException {
    try {
      // use the KMS client to encrypt the plaintext
    } catch (Exception e) {
      throw new GeneralSecurityException("encryption failed", e);
    }
  }
  @Override
  public byte[] decrypt(final byte[] ciphertext, final byte[] associatedData)
      throws GeneralSecurityException {
    try {
      // use the KMS client to decrypt the ciphertext
    } catch (Exception e) {
      throw new GeneralSecurityException("decryption failed", e);
    }
  }
}
The final step is to create a provider configuration file named
io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver in the resource
directory META-INF/services.  The contents of the file should contain the
fully qualified name of the custom KmsDriver, such as the following:
com.mycompany.CustomKmsDriver
This tells the service loader what class to instantiate when the KmsDriver
interface is requested.