Kafka Connect Drop SMT Usage Reference for Confluent Platform

The Drop SMT (io.confluent.connect.transforms.Drop) sets the key or value of a Apache Kafka® record to null and lets you control how the corresponding schema is handled.

Description

The Drop SMT sets either the key or value of a message to null. You can also set the corresponding schema to null, make it optional, check that it is already optional, or keep it as-is. Use the transformation type designed for the record key (io.confluent.connect.transforms.Drop$Key) or value (io.confluent.connect.transforms.Drop$Value).

Installation

Confluent develops this transformation and does not ship it by default with Kafka or Confluent Platform. You can install this transformation using the confluent connect plugin install command:

confluent connect plugin install confluentinc/connect-transforms:latest

Properties

Name

Description

Type

Default

Valid Values

Importance

schema.behavior

How to handle non-null schemas:

  • nullify (default): Sets the schema to null.

  • retain: Keeps the schema regardless of optional status.

  • validate: Checks that the schema is optional and throws an exception if not.

  • force_optional: Overwrites the schema to be optional if it is not already.

string

nullify

nullify, retain, validate, and force_optional

medium

Examples

The following examples show how to use and configure the Drop SMT.

  • Drop the key from the message. Because schema.behavior is not set, the SMT uses the default value nullify, which also sets the schema to null.

    "transforms": "dropKeyExample", "dropValueAndForceOptionalSchemaExample",
    "transforms.dropKeyExample.type": "io.confluent.connect.transforms.Drop$Key"
    
  • Drop the value from the message. If the schema for the value isn’t already optional, this transform updates it to be optional.

    "transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Value",
    "transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "force_optional"
    

Schema behavior options

The following examples show how to use the schema.behavior property with the Drop SMT.

  • Record contents using Drop$Key with schema.behavior set to nullify:

    "transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
    "transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "nullify"
    
    • Before: key: 24, schema: {"type": "integer"}

    • After: key: null, schema: null

    The Drop SMT nullifies the key and the schema.


  • Record contents using Drop$Key with schema.behavior set to retain:

    "transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
    "transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "retain"
    
    • Before: key: 24, schema: {"type": "integer"}

    • After: key: null, schema: {"type": "integer"}

    The Drop SMT nullifies the key and retains the schema.


  • Record contents using Drop$Key with schema.behavior set to validate where the schema is not optional:

    "transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
    "transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "validate"
    
    • Before: key: 24, schema: {"type": "integer"}.

    • After: Throws exception because the schema is not optional.


  • Record contents using Drop$Key with schema.behavior set to validate, where the schema is optional:

    "transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
    "transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "validate"
    
    • Before: key: 24, schema: {"type": "integer", "optional": true}

    • After: key: null, schema: {"type": "integer", "optional": true}

    The Drop SMT nullifies the key and retains the schema.


  • Record contents using Drop$Key with schema.behavior set to force_optional, where the schema is not optional:

    "transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
    "transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "force_optional"
    
    • Before: key: 24, schema: {"type": "integer"}

    • After: key: null, schema: {"type": "integer", "optional": "true"}

    The Drop SMT nullifies the key and makes the schema optional.


  • Record contents using Drop$Key with schema.behavior set to force_optional, where the schema is already optional:

    "transforms.dropValueAndForceOptionalSchemaExample.type": "io.confluent.connect.transforms.Drop$Key",
    "transforms.dropValueAndForceOptionalSchemaExample.schema.behavior": "force_optional"
    
    • Before: key: 24, schema: {"type": "integer", "optional": true}

    • After: key: null, schema: {"type": "integer", "optional": true}

    The Drop SMT nullifies the key and retains the schema.

Predicates

Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.