Kafka Connect Drop SMT for Confluent Cloud

The following provides usage information for the Confluent Single Message Transformation (SMT) io.confluent.connect.transforms.Drop.

Description

Set either the key or value of a message to null. The corresponding schema can also be set to null, set as optional, checked that it is already optional, or kept as-is. Use the concrete transformation type designed for the record key (io.confluent.connect.transforms.Drop$Key) or value (io.confluent.connect.transforms.Drop$Value).

Installation

This transformation is developed by Confluent and does not ship by default with Kafka or Confluent Cloud. You can install this transformation using the confluent connect plugin install command:

confluent connect plugin install confluentinc/connect-transforms:latest

Examples

The configuration snippets below show how to use and configure the Drop SMT.

Drop the key from the message, using the default behavior for schemas, which nullifies the keys if they are not already null.

"transforms": "dropKeyExample", "dropValueAndForceOptionalSchemaExample",
"transforms.dropKeyExample.type": "io.confluent.connect.transforms.Drop$Key"

Drop the value from the message. If the schema for the value isn’t already optional, forcefully overwrite it to become optional.

"transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "force_optional"

Record contents using Drop$Key with schema.behavior set to nullify:

"transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
"transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "nullify"
  • Before: key: 24, schema: {"type": "integer"}
  • After: key: null, schema: null

The key is nullified and the schema is nullified.

Record contents using Drop$Key with schema.behavior set to retain:

"transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
"transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "retain"
  • Before: key: 24, schema: {"type": "integer"}
  • After: key: null, schema: {"type": "integer"}

The key is nullified and the schema is unchanged.

Record contents using Drop$Key with schema.behavior set to validate where the schema is not optional:

"transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
"transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "validate"
  • Before: key: 24, schema: {"type": "integer"}
  • After: Throws exception because the schema is not optional.

Record contents using Drop$Key with schema.behavior set to validate, where the schema is optional:

"transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
"transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "validate"
  • Before: key: 24, schema: {"type": "integer", "optional": true}
  • After: key: null, schema: {"type": "integer", "optional": true}

The key is nullified and the schema is unchanged.

Record contents using Drop$Key with schema.behavior set to force_optional, where the schema is not optional:

"transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
"transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "force_optional"
  • Before: key: 24, schema: {"type": "integer"}
  • After: key: null, schema: {"type": "integer", "optional": "true"}

The key is nullified and the schema is made optional.

Record contents using Drop$Key with schema.behavior set to force_optional, where the schema is already optional:

"transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
"transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "force_optional"
  • Before: key: 24, schema: {"type": "integer", "optional": true}
  • After: key: null, schema: {"type": "integer", "optional": true}

The key is nullified and the schema is unchanged.

Tip

For additional examples, see Drop for managed connectors.

Properties

Name Description Type Default Valid Values Importance
schema.behavior How to handle non-null schemas. If set to nullify, then the schema for the new record is null. If set to retain, the schema is used, regardless of whether it is optional. If set to validate, the schema is checked first, and if it is optional, it is used as-is; otherwise, an exception is thrown. If set to force_optional, the schema is overwritten to be optional, if it is not already. string nullify [nullify, retain, validate, force_optional] medium

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.