Connect Secret Registry

Kafka Connect provides a secret serving layer called the Secret Registry. The Secret Registry enables Connect to store encrypted Connect credentials in a topic exposed through a REST API. This eliminates any unencrypted credentials being located in the actual connector configuration.

Two additional Connect REST API extensions support the Connect Secret Registry. The first extension enables RBAC. The second extension instantiates the Secret Registry node in Connect. Note that the property takes a comma-separated list of class names.

rest.extension.classes=io.confluent.connect.security.ConnectSecurityExtension,io.confluent.connect.secretregistry.ConnectSecretRegistryExtension

The Connect Secret Registry provides the following:

  • Persistence: Secrets are stored in a compacted topic.
  • Key grouping: Secrets are associated with both a key and a path. This allows multiple keys to be grouped together. Authorization is typically performed at the path level.
  • Versioning: Multiple versions of a secret can be stored.
  • Encryption: Keys are stored in encrypted format.
  • Master key rotation: The master key for encryption can be changed. This allows all secrets to be re-encrypted if necessary.
  • Auditing: All requests to save or retrieve secrets are logged.

The following sections define the roles used to configure and interact with the Secret Registry and show a worker configuration example.

ResourceOwner and UserAdmin

The ResourceOwner (the user creating a new connector) is responsible for submitting the request for connector credentials to the UserAdmin before creating the connector.

Once the request is received, the UserAdmin creates the secrets for the connector with a path consisting of the connector name and the keys username and password for the service account that has permissions to access the topics that the connector will consume from or produce to. The secrets are created using a POST API request. For example:

POST /secret/paths/<connector-name>/keys/<username>/versions
{
  "secret": "<password>"
}

The following properties are then included in the connector configuration:

Sink connector properties:

consumer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="${secret:<connector-name>:<username>}" \
  password="${secret:<connector-name>:<password>}" \
  metadataServerUrls="http://<metadata server URLS>:8090";

Source connector properties:

producer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="${secret:<connector-name>:<username>}" \
  password="${secret:<connector-name>:<password>}" \
  metadataServerUrls="http:/<metadata server URLS>:8090";

When the user submits the connector configuration, Connect validates that all external variable references have a path that matches the connector ID. The connector configuration is rejected if the connector configuration has variable references with a path that does not match the connector ID.

Worker configuration example

The following example shows the configuration Secret Registry parameters used in the Connect worker. Note the following:

  • The <service-principal-username> is the $CONNECT_USER that was granted access to the $SECRET_REGISTRY_GROUP and the $SECRETS_TOPIC in Configuring RBAC for a Connect cluster.
  • The <service-principal-password> is the password used for $CONNECT_USER in Configuring RBAC for a Connect cluster.
  • You can also specify the parameter config.providers.secret.param.master.encryption.old.key when you update the master key. When specified, all secrets are decrypted using the old key and re-encrypted using the new key.
### Secret Provider

config.providers=secret
config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider

config.providers.secret.param.master.encryption.key=<encryption key>
config.providers.secret.param.kafkastore.bootstrap.servers=SASL_PLAINTEXT://<Kafka broker URLs>
config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="<service-principal-username>" \
  password="<service-principal-password>" \
  metadataServerUrls="<metadata server URLs>";

REST API endpoints

The following are the REST API endpoints used for the Connect Secret Registry.

Create a secret

POST /secret/paths/{path}/keys/{key}/versions
{
  "secret": "my secret"
}

Get a secret

GET /secret/paths/{path}/keys/{key}/versions/latest

Get a specific version of a key

GET /secret/paths/{path}/keys/{key}/versions/{version}

Get all versions of a key

GET /secret/paths/{path}/keys/{key}

Get all latest versions for keys in a path

GET /secret/paths/{path}

List the versions of a key

GET /secret/paths/{path}/keys/{key}/versions

List the keys in a path

GET /secret/paths/{path}/keys

List all paths

GET /secret/paths

Delete a specific version of a key

DELETE /secret/paths/{path}/keys/{key}/versions/{version}

Delete all versions of a key

DELETE /secret/paths/{path}/keys/{key}

Delete a path

DELETE /secret/paths/{path}