Configure Single Message Transforms for Kafka Connectors in Confluent Cloud

Confluent Cloud fully-managed connectors can be configured with one or more single message transforms (SMTs) to make simple and lightweight modifications to message values, keys, and headers. This can be convenient for inserting fields, masking information, event routing, and other minor data adjustments. SMTs can be used with sink and source connectors.

When multiple SMTs are used with a source connector, Connect passes each source record produced by the connector through the first SMT, which makes its modifications. The updated source record is then passed to the next SMT in the chain. This sequence continues for any additional SMTs in the chain. The final updated source record is converted to binary format (bytes) and written to Apache Kafka®.

When multiple SMTs are used with a sink connector, Connect first reads the record from Kafka and converts the bytes to a sink record. Connect then passes the record through the SMT, which makes its modifications. The updated sink record is then passed to the next SMT in the chain. This continues for the remaining SMTs. The final updated sink record is then passed to the sink connector for processing.

Single Message Transforms

Single Message Transforms for Managed Connectors

Note the following additional details when using this feature:

  • SMTs are very useful, but should only be used for simple data transformations. More sophisticated transformations and data integrations should be handled using ksqlDB and Kafka stream processing.
  • There is currently a limit of 10 SMT instances per connector.
  • See SMT Limitations for additional information

How to use SMTs with the Cloud Console

The following steps show how to set up an SMT using the Confluent Cloud Console. The steps use the Datagen Source Connector for Confluent Cloud.

Note

The connector applies transforms in the order in which they are created.

Add transforms

  1. Go to the advanced configurations section of the connector UI and click Add SMT.

    Single Message Transforms

    Single Message Transforms

  2. Accept the default transform name or enter a new name in the Transforms name field. For example, if you are masking a customer ID using the transform MaskField$Value you could enter the name mask_userid.

    Transform name

    Transform name

  3. After you select the transform, additional property fields are displayed. The following properties are displayed for MaskField$Value:

    • fields (required): Enter the name of the record fields you want to mask. In the example, the field userid will be masked.
    • replacement (optional) This field allows you to set a replacement string. In the example, the replacement for the userid will show ******.
    Configure Mask Options

    Configure transform options

    Note

    SMT configuration properties vary depending on the selected transform type. For transform configuration properties and descriptions, see the SMT docs for the transform you want to apply.

  4. After you have completed the first transform, you can click Add new SMT to add another SMT. If you are done adding SMTs, and want to add a predicate to the SMT, click Add Predicate. For details about adding predicates, see Add, link, and delete predicates.

  5. When you are done adding SMTs, click Continue and complete any remaining connector startup tasks.

  6. (Optional) Click Request preview and verify that the data output in the "record" section is what you want. The preview will take a few minutes to generate. If the data preview shows what you want, launch the connector.

How to use SMTs with the Confluent CLI

Refer to the following docs for creating the JSON configuration file to load using the CLI:

  • The managed connector docs provide the configuration properties and a JSON file example for each connector.
  • The SMT docs provide the transformation configuration properties that you need to add to the basic connector configuration.
  • Refer to the SMT examples for configuration and transformation examples.
  • For examples showing how to use predicates, see Predicate Examples.

Failed records

For source connectors, if an SMT is misconfigured the connector fails. Note the following troubleshooting information:

For sink connectors, the connector continues running. Failed records are sent to the Confluent Cloud Dead Letter Queue. The DLQ shows the full stack trace for a failed record.

Note

See Transform alias validation for additional information.

SMTs and Data Preview

For SMTs, data preview creates one record corresponding to one transformation applied to a Kafka record. For multiple transformations, each record represents one sequential transformation applied to the Kafka record. For this reason, if you configure n transforms, data preview creates n + 1 records. When you preview transformations, be sure to keep this in mind.

In the following example, a source connector is configured with two SMTs. Note that the transformation steps in the figures are represented by current_step:n out of total_step:n. The entries transformation_name and transformation_type show what transformation is being applied at the step.

  1. The first record is the actual source record received by the connector. No transformations are applied yet.

    Single Message Transforms and Data Preview 1 of 3

    No transformations applied

  2. The first transformation (ValueToKey) is applied to the Kafka record. The Kafka record key is set as the gender field value.

    Single Message Transforms and Data Preview 2 of 3

    First transformation applied

  3. The second transformation (MaskField$Value) is applied to the Kafka record. The Kafka record value is transformed and the gender field value is removed.

    Single Message Transforms and Data Preview 3 of 3

    Second transformation applied

SMT Limitations

Note the following limitations.

Transform alias validation

Several fully-managed connectors already have internal transformations. If you add a transformation alias that conflicts with the alias present in the connector’s internal configuration template, Connect throws a validation error similar to the following example:

Invalid value [internalxform, ..., internalxform] for configuration transforms: Duplicate alias provided.

In the error message above, internalxform is the internal alias present in the configuration template that conflicts with the added alias. You can’t use the following aliases in your transformation.

Managed Connector Connector plugin Internal alias
Amazon S3 Sink S3_SINK requireTimestampTransform
Google BigQuery Sink BigQuerySink requireMapTransform
HTTP Sink HttpSink requireTimestampTransform
Microsoft SQL Server CDC Source (Debezium) [Legacy] SqlServerCdcSource unwrap
Microsoft SQL Server CDC Source V2 (Debezium) SqlServerCdcSourceV2 unwrap
MySQL CDC Source (Debezium) [Legacy] MySqlCdcSource unwrap
MySQL CDC Source V2 (Debezium) MySqlCdcSourceV2 unwrap
Postgres CDC Source (Debezium) [Legacy] PostgresCdcSource unwrap
Postgres CDC Source V2 (Debezium) PostgresCdcSourceV2 unwrap

Debezium transformations

ExtractNewRecordState and EventRouter (Debezium) SMTs are available for the following connectors:

Note the following when configuring the ExtractNewRecordState SMT:

  • You must set the configuration property after.state.only to false.
  • Only after setting after.state.only to false in the connector UI, will you be able to set the transform alias to unwrap.

Unsupported transformations

  • The following SMTs are not currently supported for fully-managed connectors:
    • RegexRouter: For source connectors, the alternative is to use the TopicRegexRouter SMT.
    • InsertHeader
    • DropHeaders
  • Certain sink connectors do not support the following transformations:
    • org.apache.kafka.connect.transforms.TimestampRouter
    • io.confluent.connect.transforms.MessageTimestampRouter
    • io.confluent.connect.transforms.ExtractTopic$Header
    • io.confluent.connect.transforms.ExtractTopic$Key
    • io.confluent.connect.transforms.ExtractTopic$Value
    • io.confluent.connect.cloud.transforms.TopicRegexRouter
  • Certain source connectors do not support the following transformations:
    • org.apache.kafka.connect.transforms.ValueToKey
    • org.apache.kafka.connect.transforms.HoistField$Value

SMT list

Note

Not all of the listed SMTs are supported by all fully-managed connectors. See Unsupported transformations.

Transform Description
Cast Cast fields or the entire key or value to a specific type (for example, to force an integer field to a smaller width).
Drop Drop either a key or a value from a record and set it to null.
DropHeaders Not currently available for managed connectors. Drop one or more headers from each record.
EventRouter Only available for managed Debezium connectors. Route Debezium outbox events using a regex configuration option.
ExtractField Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. Any null values are passed through unmodified.
ExtractTopic Replace the record topic with a new topic derived from its key or value.
Filter (Apache Kafka) Drop all records. Designed to be used in conjunction with a Predicate.
Filter (Confluent) Include or drop records that match a configurable filter.condition.
Flatten Flatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character.
GzipDecompress Not currently available for managed connectors. Gzip-decompress the entire byteArray key or value input.
HeaderFrom Not currently available for managed connectors. Moves or copies fields in a record key or value into the record’s header.
HoistField Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.
InsertField Insert field using attributes from the record metadata or a configured static value.
InsertHeader Not currently available for managed connectors. Insert a literal value as a record header.
MaskField Mask specified fields with a valid null value for the field type.
MessageTimeStampRouter Update the record’s topic field as a function of the original topic value and the record’s timestamp field.
RegexRouter Not currently available for managed connectors. Update the record topic using the configured regular expression and replacement string.
ReplaceField Filter or rename fields.
SetSchemaMetadata Set the schema name, version, or both on the record’s key or value schema.
TimestampConverter Convert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types.
TimestampRouter Update the record’s topic field as a function of the original topic value and the record timestamp.
TombstoneHandler Manage tombstone records. A tombstone record is defined as a record with the entire value field being null, whether or not it has ValueSchema.
TopicRegexRouter Only available for managed Source connectors. Update the record topic using the configured regular expression and replacement string.
ValueToKey Replace the record key with a new key formed from a subset of fields in the record value.

SMT examples

The following examples show connector configuration and transformation examples for source and sink connectors. The examples show configurations for the Datagen Source and the Google Cloud Functions Sink connectors. See the SMT documentation for SMT definitions, configuration property values, and additional examples.

Note that several SMTs (for example, Cast) may be applied to either the record Value or the record Key. Using Cast as the example, the connector applies org.apache.kafka.connect.transforms.Cast$Value to the Kafka record value. It applies org.apache.kafka.connect.transforms.Cast$Key to the Kafka record key.

Cast

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-cast",
  "max.interval": "3000",
  "name": "DatagenSourceSmtCast",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "castValues",
  "transforms.castValues.type": "org.apache.kafka.connect.transforms.Cast$Value",
  "transforms.castValues.spec": "zipcode:float64, orderid:string, orderunits:int32"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Drop

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-drop",
  "max.interval": "3000",
  "name": "DatagenSourceSmtDrop",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "dropValue",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Extract Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-extract-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtExtractField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "extractAddress",
  "transforms.extractAddress.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
  "transforms.extractAddress.field": "address"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Extract Topic

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-extract-topic",
  "max.interval": "3000",
  "name": "DatagenSourceSmtExtractTopic",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "setTopic",
  "transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
  "transforms.setTopic.field": "itemid"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Filter (Apache Kafka)

Note

Shows additional Tombstone predicate.

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-filter-ak",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFilterAk",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "predicates": "isNull",
  "predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
  "transforms": "dropValue, filterNull",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
  "transforms.filterNull.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterNull.predicate": "isNull"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Filter (Confluent)

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-filter-cp",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFilterCp",
  "output.data.format": "JSON",
  "quickstart": "RATINGS",
  "tasks.max": "1",
  "transforms": "filterValue",
  "transforms.filterValue.filter.condition": "$[?(@.channel == 'ios')]",
  "transforms.filterValue.filter.type": "include",
  "transforms.filterValue.type": "io.confluent.connect.transforms.Filter$Value"
}

Transformation example:

[
  {
    "rating_id": 140,
    "user_id": 3,
    "stars": 4,
    "route_id": 7425,
    "rating_time": 1669,
    "channel": "ios",
    "message": "thank you for the most friendly, helpful experience today at your new lounge"
  },
  {
    "rating_id": 491,
    "user_id": 7,
    "stars": 4,
    "route_id": 3302,
    "rating_time": 5881,
    "channel": "iOS-test",
    "message": "more peanuts please"
  }
]

Flatten

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-flatten",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFlatten",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "flatten",
  "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.flatten.delimiter": "_"
}

Transformation example:

{
  "ordertime": 1491310657544,
  "orderid": 9,
  "itemid": "Item_826",
  "orderunits": 4.188698361592631,
  "address": {
    "city": "City_73",
    "state": "State_47",
    "zipcode": 54450
  }
}

Hoist Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-hoist-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtHoistField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "hoist",
  "transforms.hoist.type": "org.apache.kafka.connect.transforms.HoistField$Value",
  "transforms.hoist.field": "wrapperField"
}

Transformation example:

{
  "registertime": 1506959883575,
  "userid": "User_2",
  "regionid": "Region_1",
  "gender": "MALE"
}

Insert Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-insert-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtInsertField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "insert",
  "transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.insert.partition.field": "PartitionField",
  "transforms.insert.static.field": "InsertedStaticField",
  "transforms.insert.static.value": "SomeValue",
  "transforms.insert.timestamp.field": "TimestampField",
  "transforms.insert.topic.field": "TopicField"
}

Transformation example:

{
  "registertime": 1506959883575,
  "userid": "User_2",
  "regionid": "Region_1",
  "gender": "MALE"
}

Mask Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-mask-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtMaskField",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "gender",
  "transforms.mask.replacement": "REDACTED"
}

Transformation example:

{
  "registertime": 1499746213074,
  "userid": "User_5",
  "regionid": "Region_4",
  "gender": "MALE"
}

Message Timestamp Router

Configuration examples:

Not currently available

Transformation example:

{
  "key": "User_8",
  "value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
  "topic": "msg_timestamp_router_topic",
  "partition": 0,
  "offset": 812925,
  "timestamp": 1628486671963
}

Replace Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-replace-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtReplaceField",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "replacefield",
  "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.replacefield.exclude": "userid",
  "transforms.replacefield.include": "regionid",
  "transforms.replacefield.renames": "regionid:Region_Id"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "userid": 1234,
    "regionid": "Region_3",
    "ModifiedBy": "XYZ"
  }
}

Set Schema Metadata

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-set-schema-metadata",
  "max.interval": "3000",
  "name": "DatagenSourceSmtSetSchemaMetadata",
  "output.data.format": "AVRO",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "setSchemaMetadata",
  "transforms.setSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
  "transforms.setSchemaMetadata.schema.name": "schema_name",
  "transforms.setSchemaMetadata.schema.version": "12"
}

Transformation example:

{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}

Timestamp Converter

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-ts-converter",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTimestampConverter",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "tsConverter",
  "transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.tsConverter.field": "ordertime",
  "transforms.tsConverter.format": "yyyy-MM-dd",
  "transforms.tsConverter.target.type": "string"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "ordertime": 1628035200000,
    "orderid": "ABC",
    "itemid": "XYZ"
  }
}

Timestamp Router

Configuration examples:

Not currently available

Transformation example:

{
  "key": "User_8",
  "value": "{registertime=1491516816009, gender=FEMALE, regionid=Region_4, userid=User_8}",
  "topic": "topic"
}

Tombstone Handler

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-tombstone-handler",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTombstoneHandler",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "dropValue,tombstoneFail",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
  "transforms.tombstoneFail.type": "io.confluent.connect.transforms.TombstoneHandler",
  "transforms.tombstoneFail.behavior": "fail"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": null
}

Topic Regex Router

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-users-json",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTopicRegexRouter",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "addPrefixRegex",
  "transforms.addPrefixRegex.type": "io.confluent.connect.cloud.transforms.TopicRegexRouter",
  "transforms.addPrefixRegex.regex": ".*",
  "transforms.addPrefixRegex.replacement": "prefix_$0"
}

Transformation example:

{
  "key": "User_8",
  "value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
  "topic": "datagen-source-users-json",
  "partition": 0,
  "offset": 812925,
  "timestamp": 1628486671963
}

Value To Key

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-value-to-key",
  "max.interval": "3000",
  "name": "DatagenSourceSmtValueToKey",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "valueToKey",
  "transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.valueToKey.fields": "registertime, userid"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "userid": 1234,
    "registertime": "ABC",
    "ModifiedBy": "XYZ"
  }
}