Single Message Transforms for Managed Connectors

Confluent Cloud managed connectors can be configured with one or more single message transforms (SMTs) to make simple and lightweight modifications to message values, keys, and headers. This can be convenient for inserting fields, masking information, event routing, and other minor data adjustments. SMTs can be used with sink and source connectors.

When multiple SMTs are used with a source connector, Connect passes each source record produced by the connector through the first SMT, which makes its modifications. The updated source record is then passed to the next SMT in the chain. This sequence continues for any additional SMTs in the chain. The final updated source record is converted to binary format (bytes) and written to Apache Kafka®.

When multiple SMTs are used with a sink connector, Connect first reads the record from Kafka and converts the bytes to a sink record. Connect then passes the record through the SMT, which makes its modifications. The updated sink record is then passed to the next SMT in the chain. This continues for the remaining SMTs. The final updated sink record is then passed to the sink connector for processing.

Single Message Transforms

Single Message Transforms for Managed Connectors

Note

  • SMTs are very useful, but should only be used for simple data transformations. More sophisticated transformations and data integrations should be handled using ksqlDB and Kafka stream processing.
  • See SMT Limitations for additional information

How to use SMTs with the Cloud Console

The following steps show how to set up an SMT using the Confluent Cloud Console. The steps use the Datagen Source Connector for Confluent Cloud.

Note

  • The connector applies transforms in the order in which they are created.
  • To create a transform predicate, you use the CLI to create the connector configuration.
  1. Go to the advanced configurations section of the connector UI and click Add a single message transform.

    Single Message Transforms Button

    Transforms

  2. Accept the default transform name or enter a new name in the Transforms name field. For example, if you are masking a customer ID using the transform MaskField$Value you could enter the alias mask_userid.

    Transform name

    Transform name

  3. Select the transform type from the drop-down list.

    Select Transform

    Transform selection

  4. After you select the transform, additional property fields are displayed. The following properties are displayed for MaskField$Value:

    • fields (required): Enter the name of the record fields you want to mask. In the example, the field userid will be masked.
    • replacement (optional) This field allows you to set a replacement string. In the example, the replacement for the userid will show ******.
    Configure Mask Options

    Configure transform options

    Note

    SMT configuration properties vary depending on the selected transform type. For transform configuration properties and descriptions, see the SMT docs for the transform you want to apply.

  5. After you have completed the first transform, you can add additional SMTs if you want. The connector applies transforms in the order in which they are created.

  6. When you are done adding SMTs, click Continue and complete any remaining connector startup tasks. Once you have completed all startup tasks, the connector configuration is displayed.

    Single Message Transform Connector Configuration

    Connector configuration

  7. (Optional) Click Request preview and verify that the data output in the "record" section is what you want. The preview will take a few minutes to generate. If the data preview shows what you want, launch the connector.

How to use SMTs with the Confluent CLI

Refer to the following docs for creating the JSON configuration file to load using the CLI:

  • The managed connector docs provide the configuration properties and a JSON file example for each connector.
  • The SMT docs provide the transformation configuration properties that you need to add to the basic connector configuration.
  • Refer to the SMT examples for configuration and transformation examples.

Predicates

You can apply predicates to a transform using the CLI. When you apply a predicate to an SMT, the predicate instructs the connector to apply the SMT conditionally. In the predicate, you specify the predicate condition that the connector uses to evaluate each message that it processes.

When a record is evaluated, the connector first checks the record against the predicate condition. If the condition is true for the record, the connector applies the SMT to the record. Records that do not match the predicate are passed through unmodified. The negate option inverts the predicate. Setting negate to true applies the predicate only to records that do not match the defined predicate condition.

For examples showing how to use predicates, see Predicate Examples.

Failed records

For source connectors, if an SMT is misconfigured the connector fails. Note the following troubleshooting information:

For sink connectors, the connector continues running. Failed records are sent to the Dead Letter Queue. The DLQ shows the full stack trace for a failed record.

Note

See Transform alias validation for additional information.

SMTs and Data Preview

For SMTs, data preview creates one record corresponding to one transformation applied to a Kafka record. For multiple transformations, each record represents one sequential transformation applied to the Kafka record. For this reason, if you configure n transforms, data preview creates n + 1 records. When you preview transformations, be sure to keep this in mind.

In the following example, a source connector is configured with two SMTs. Note that the transformation steps in the figures are represented by current_step:n out of total_step:n. The entries transformation_name and transformation_type show what transformation is being applied at the step.

  1. The first record is the actual source record received by the connector. No transformations are applied yet.

    Single Message Transforms and Data Preview 1 of 3

    No transformations applied

  2. The first transformation (ValueToKey) is applied to the Kafka record. The Kafka record key is set as the gender field value.

    Single Message Transforms and Data Preview 2 of 3

    First transformation applied

  3. The second transformation (MaskField$Value) is applied to the Kafka record. The Kafka record value is transformed and the gender field value is removed.

    Single Message Transforms and Data Preview 3 of 3

    Second transformation applied

SMT Limitations

Note the following limitations.

Transform alias validation

Several managed connectors already have internal transformations. If you add a transformation alias that conflicts with the alias present in the connector’s internal configuration template, Connect throws a validation error similar to the following example:

Invalid value [internalxform, ..., internalxform] for configuration transforms: Duplicate alias provided.

In the error message above, internalxform is the internal alias present in the configuration template that conflicts with the added alias. You can’t use the following aliases in your transformation.

Managed Connector Connector plugin Internal alias
Amazon S3 Sink S3_SINK requireTimestampTransform
Google BigQuery Sink BigQuerySink requireMapTransform
HTTP Sink HttpSink requireTimestampTransform
Microsoft SQL Server CDC Source (Debezium) SqlServerCDC unwrap
MySQL CDC Source (Debezium) MySqlCDC unwrap
Postgres CDC Source (Debezium) PostgresCDC unwrap

Unsupported transformations

  • The following SMTs are not currently supported for managed connectors:
    • RegexRouter: For source connectors, the alternative is to use the TopicRegexRouter SMT.
    • InsertHeader
    • DropHeaders
  • Certain sink connectors do not support the following transformations:
    • org.apache.kafka.connect.transforms.TimestampRouter
    • io.confluent.connect.transforms.MessageTimestampRouter
    • io.confluent.connect.transforms.ExtractTopic$Header
    • io.confluent.connect.transforms.ExtractTopic$Key
    • io.confluent.connect.transforms.ExtractTopic$Value
    • io.confluent.connect.cloud.transforms.TopicRegexRouter
  • Certain source connectors do not support the following transformations:
    • org.apache.kafka.connect.transforms.HoistField$Value
    • org.apache.kafka.connect.transforms.ValueToKey

SMT list

Note

Not all of the listed SMTs are supported by all managed connectors. See Unsupported transformations.

Transform Description
Cast Cast fields or the entire key or value to a specific type (for example, to force an integer field to a smaller width).
Drop Drop either a key or a value from a record and set it to null.
DropHeaders Not currently available for managed connectors. Drop one or more headers from each record.
ExtractField Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. Any null values are passed through unmodified.
ExtractTopic Replace the record topic with a new topic derived from its key or value.
Filter (Apache Kafka) Drop all records. Designed to be used in conjunction with a Predicate.
Filter (Confluent) Include or drop records that match a configurable filter.condition.
Flatten Flatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character.
GzipDecompress Not currently available for managed connectors. Gzip-decompress the entire byteArray key or value input.
HeaderFrom Not currently available for managed connectors. Moves or copies fields in a record key or value into the record’s header.
HoistField Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.
InsertField Insert field using attributes from the record metadata or a configured static value.
InsertHeader Not currently available for managed connectors. Insert a literal value as a record header.
MaskField Mask specified fields with a valid null value for the field type.
MessageTimeStampRouter Update the record’s topic field as a function of the original topic value and the record’s timestamp field.
RegexRouter Not currently available for managed connectors. Update the record topic using the configured regular expression and replacement string.
ReplaceField Filter or rename fields.
SetSchemaMetadata Set the schema name, version, or both on the record’s key or value schema.
TimestampConverter Convert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types.
TimestampRouter Update the record’s topic field as a function of the original topic value and the record timestamp.
TombstoneHandler Manage tombstone records. A tombstone record is defined as a record with the entire value field being null, whether or not it has ValueSchema.
TopicRegexRouter Only available for managed Source connectors. Update the record topic using the configured regular expression and replacement string.
ValueToKey Replace the record key with a new key formed from a subset of fields in the record value.

SMT examples

The following examples show connector configuration and transformation examples for source and sink connectors. The examples show configurations for the Datagen Source and the Google Cloud Functions Sink connectors.

Note

  • Several SMTs (for example, Cast) may be applied to either the record Value or the record Key. Using Cast as the example, the connector applies org.apache.kafka.connect.transforms.Cast$Value to the Kafka record value. It applies org.apache.kafka.connect.transforms.Cast$Key to the Kafka record key.
  • See the SMT documentation for SMT definitions, configuration property values, and additional examples.

Cast

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-cast",
  "max.interval": "3000",
  "name": "DatagenSourceSmtCast",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "castValues",
  "transforms.castValues.type": "org.apache.kafka.connect.transforms.Cast$Value",
  "transforms.castValues.spec": "zipcode:float64, orderid:string, orderunits:int32"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Drop

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-drop",
  "max.interval": "3000",
  "name": "DatagenSourceSmtDrop",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "dropValue",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Extract Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-extract-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtExtractField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "extractAddress",
  "transforms.extractAddress.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
  "transforms.extractAddress.field": "address"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Extract Topic

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-extract-topic",
  "max.interval": "3000",
  "name": "DatagenSourceSmtExtractTopic",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "setTopic",
  "transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
  "transforms.setTopic.field": "itemid"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Filter (Apache Kafka)

Note

Shows additional Tombstone predicate.

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-filter-ak",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFilterAk",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "predicates": "isNull",
  "predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
  "transforms": "dropValue, filterNull",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
  "transforms.filterNull.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterNull.predicate": "isNull"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Filter (Confluent)

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-filter-cp",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFilterCp",
  "output.data.format": "JSON",
  "quickstart": "RATINGS",
  "tasks.max": "1",
  "transforms": "filterValue",
  "transforms.filterValue.filter.condition": "$[?(@.channel == 'ios')]",
  "transforms.filterValue.filter.type": "include",
  "transforms.filterValue.type": "io.confluent.connect.transforms.Filter$Value"
}

Transformation example:

[
  {
    "rating_id": 140,
    "user_id": 3,
    "stars": 4,
    "route_id": 7425,
    "rating_time": 1669,
    "channel": "ios",
    "message": "thank you for the most friendly, helpful experience today at your new lounge"
  },
  {
    "rating_id": 491,
    "user_id": 7,
    "stars": 4,
    "route_id": 3302,
    "rating_time": 5881,
    "channel": "iOS-test",
    "message": "more peanuts please"
  }
]

Flatten

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-flatten",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFlatten",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "flatten",
  "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.flatten.delimiter": "_"
}

Transformation example:

{
  "ordertime": 1491310657544,
  "orderid": 9,
  "itemid": "Item_826",
  "orderunits": 4.188698361592631,
  "address": {
    "city": "City_73",
    "state": "State_47",
    "zipcode": 54450
  }
}

Hoist Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-hoist-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtHoistField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "hoist",
  "transforms.hoist.type": "org.apache.kafka.connect.transforms.HoistField$Value",
  "transforms.hoist.field": "wrapperField"
}

Transformation example:

{
  "registertime": 1506959883575,
  "userid": "User_2",
  "regionid": "Region_1",
  "gender": "MALE"
}

Insert Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-insert-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtInsertField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "insert",
  "transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.insert.partition.field": "PartitionField",
  "transforms.insert.static.field": "InsertedStaticField",
  "transforms.insert.static.value": "SomeValue",
  "transforms.insert.timestamp.field": "TimestampField",
  "transforms.insert.topic.field": "TopicField"
}

Transformation example:

{
  "registertime": 1506959883575,
  "userid": "User_2",
  "regionid": "Region_1",
  "gender": "MALE"
}

Mask Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-mask-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtMaskField",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "gender",
  "transforms.mask.replacement": "REDACTED"
}

Transformation example:

{
  "registertime": 1499746213074,
  "userid": "User_5",
  "regionid": "Region_4",
  "gender": "MALE"
}

Message Timestamp Router

Configuration examples:

Not currently available

Transformation example:

{
  "key": "User_8",
  "value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
  "topic": "msg_timestamp_router_topic",
  "partition": 0,
  "offset": 812925,
  "timestamp": 1628486671963
}

Replace Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-replace-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtReplaceField",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "replacefield",
  "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.replacefield.exclude": "userid",
  "transforms.replacefield.include": "regionid",
  "transforms.replacefield.renames": "regionid:Region_Id"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "userid": 1234,
    "regionid": "Region_3",
    "ModifiedBy": "XYZ"
  }
}

Set Schema Metadata

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-set-schema-metadata",
  "max.interval": "3000",
  "name": "DatagenSourceSmtSetSchemaMetadata",
  "output.data.format": "AVRO",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "setSchemaMetadata",
  "transforms.setSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
  "transforms.setSchemaMetadata.schema.name": "schema_name",
  "transforms.setSchemaMetadata.schema.version": "12"
}

Transformation example:

{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}

Timestamp Converter

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-ts-converter",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTimestampConverter",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "tsConverter",
  "transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.tsConverter.field": "ordertime",
  "transforms.tsConverter.format": "yyyy-MM-dd",
  "transforms.tsConverter.target.type": "string"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "ordertime": 1628035200000,
    "orderid": "ABC",
    "itemid": "XYZ"
  }
}

Timestamp Router

Configuration examples:

Not currently available

Transformation example:

{
  "key": "User_8",
  "value": "{registertime=1491516816009, gender=FEMALE, regionid=Region_4, userid=User_8}",
  "topic": "topic"
}

Tombstone Handler

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-tombstone-handler",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTombstoneHandler",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "dropValue,tombstoneFail",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
  "transforms.tombstoneFail.type": "io.confluent.connect.transforms.TombstoneHandler",
  "transforms.tombstoneFail.behavior": "fail"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": null
}

Topic Regex Router

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-users-json",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTopicRegexRouter",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "addPrefixRegex",
  "transforms.addPrefixRegex.type": "io.confluent.connect.cloud.transforms.TopicRegexRouter",
  "transforms.addPrefixRegex.regex": ".*",
  "transforms.addPrefixRegex.replacement": "prefix_$0"
}

Transformation example:

{
  "key": "User_8",
  "value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
  "topic": "datagen-source-users-json",
  "partition": 0,
  "offset": 812925,
  "timestamp": 1628486671963
}

Value To Key

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-value-to-key",
  "max.interval": "3000",
  "name": "DatagenSourceSmtValueToKey",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "valueToKey",
  "transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.valueToKey.fields": "registertime, userid"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "userid": 1234,
    "registertime": "ABC",
    "ModifiedBy": "XYZ"
  }
}