Single Message Transforms for Managed Connectors

Confluent Cloud managed connectors can be configured with one or more single message transforms (SMTs) to make simple and lightweight modifications to message values, keys, and headers. This can be convenient for inserting fields, masking information, event routing, and other minor data adjustments. SMTs can be used with sink and source connectors.

When multiple SMTs are used with a source connector, Connect passes each source record produced by the connector through the first SMT, which makes its modifications. The updated source record is then passed to the next SMT in the chain. This sequence continues for any additional SMTs in the chain. The final updated source record is converted to binary format (bytes) and written to Apache Kafka®.

When multiple SMTs are used with a sink connector, Connect first reads the record from Kafka and converts the bytes to a sink record. Connect then passes the record through the SMT, which makes its modifications. The updated sink record is then passed to the next SMT in the chain. This continues for the remaining SMTs. The final updated sink record is then passed to the sink connector for processing.

Single Message Transforms

Single Message Transforms for Managed Connectors

Note

How to use SMTs with the Cloud Console

The following example steps show how to use SMTs using the Confluent Cloud Console. The steps show examples using the Datagen Source Connector for Confluent Cloud.

Note

The following steps assume you have already started configuring the connector in the Cloud Console.

  1. In the Cloud Console connector configuration UI, there are two initial fields that support SMTs: Transforms and Predicates. Enter a meaningful name (alias) for the SMT in the Transforms field. For example, if you are masking a customer ID (MaskField$Value) you could enter the alias mask_userid.

    Single Message Transforms Initial UI Fields

    Transforms

  2. Press Add Transform. The Transforms dialog box appears after you select the transformation org.apache.kafka.connect.transforms.MaskField$Value.

    Single Message Transform Mask Field

    Mask Field options

    • fields (required): Enter the name of the record fields you want to mask. In the example, the field userid will be masked.
    • negate (optional): The negate option inverts the predicate (if used). Selecting true applies the predicate only to records that do not match the defined predicate condition. If not used, this defaults to false.
    • predicate (optional): The predicate option allows you to use or not use the predicate condition for the SMT. If you want this SMT to use the predicate condition, enter the alias you created in the Predicates field.
    • replacement (optional) This field allows you to set a replacement string. In the example, the replacement for the userid will show ******.

    Note

    SMT configuration properties vary depending on the selected transform type. For transform configuration properties and descriptions, see the SMT docs for the transform you want to apply.

  3. After you have completed the first transform, you can add additional SMTs if you want.

    Tip

    SMTs are processed in the order that aliases appear in the Transforms field.

  4. Using Predicates is optional. If you want to add a predicate, enter the predicate alias you want to use in this field. The predicate alias is used when configuring properties for the SMT. In the example, the predicate alias tombstone_records is used for the predicate org.apache.kafka.connect.transforms.predicates.RecordIsTombstone.

    Single Message Transforms Initial UI Fields

    Predicates

    When you apply a predicate to an SMT, the predicate instructs the connector to apply the SMT conditionally. In the predicate, you specify the predicate condition that the connector uses to evaluate each message that it processes. When a record is evaluated, the connector first checks the record against the predicate condition. If the condition is true for the record, the connector applies the SMT to the record. Records that do not match the predicate are passed through unmodified. The negate option inverts the predicate. Setting negate to true applies the predicate only to records that do not match the defined predicate condition.


  5. When you have added the Transforms and Predicates you want, click Next. The connector configuration is displayed.

    Single Message Transform Connector Configuration

    Connector configuration

  6. (Optional) Click the Data preview button and verify that the data output in the "record" section is what you want. The preview will take a few minutes to generate. If the data preview shows what you want, go back, add your credentials and launch the connector.

Once the connector is launched, you should see transformed records in the topic.

Single Message Transform Records

Transformed records

How to use SMTs with the Confluent Cloud CLI

Refer to the following docs for creating the JSON configuration file to load using the CLI:

  • The managed connector docs provide the configuration properties and a JSON file example for each connector.
  • The SMT docs provide the transformation configuration properties that you need to add to the basic connector configuration.
  • Refer to the SMT examples for configuration and transformation examples.

Failed records

For source connectors, if an SMT is misconfigured the connector fails. Note the following troubleshooting information:

For sink connectors, the connector continues running. Failed records are sent to the Dead Letter Queue. The DLQ shows the full stack trace for a failed record.

Note

See Transform alias validation for additional information.

SMTs and Data Preview

For SMTs, data preview creates one record corresponding to one transformation applied to a Kafka record. For multiple transformations, each record represents one sequential transformation applied to the Kafka record. For this reason, if you configure n transforms, data preview creates n + 1 records. When you preview transformations, be sure to keep this in mind.

In the following example, a source connector is configured with two SMTs. Note that the transformation steps in the figures are represented by current_step:n out of total_step:n. The entries transformation_name and transformation_type show what transformation is being applied at the step.

  1. The first record is the actual source record received by the connector. No transformations are applied yet.

    Single Message Transforms and Data Preview 1 of 3

    No transformations applied

  2. The first transformation (ValueToKey) is applied to the Kafka record. The Kafka record key is set as the gender field value.

    Single Message Transforms and Data Preview 2 of 3

    First transformation applied

  3. The second transformation (MaskField$Value) is applied to the Kafka record. The Kafka record value is transformed and the gender field value is removed.

    Single Message Transforms and Data Preview 3 of 3

    Second transformation applied

Transform alias validation

Several managed connectors already have internal transformations. If you add a transformation alias that conflicts with the alias present in the connector’s internal configuration template, Connect throws a validation error similar to the following example:

Invalid value [internalxform, ..., internalxform] for configuration transforms: Duplicate alias provided.

In the error message above, internalxform is the internal alias present in the configuration template that conflicts with the added alias. You can’t use the following aliases in your transformation.

Managed Connector Connector plugin Internal alias
MySQL CDC Source (Debezium) MySqlCDC unwrap
Postgres CDC Source (Debezium) PostgresCDC unwrap
Microsoft SQL Server CDC Source (Debezium) SqlServerCDC unwrap
Google BigQuery Sink BigQuerySink requireMapTransform
HTTP Sink HttpSink requireTimestampTransform

List of available SMTs

Transform Description
Cast Cast fields or the entire key or value to a specific type (for example, to force an integer field to a smaller width).
Drop Drop either a key or a value from a record and set it to null.
ExtractField Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. Any null values are passed through unmodified.
ExtractTopic Replace the record topic with a new topic derived from its key or value.
Filter (Apache Kafka) Drop all records. Designed to be used in conjunction with a Predicate.
Filter (Confluent) Include or drop records that match a configurable filter.condition.
Flatten Flatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character.
HoistField Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.
InsertField Insert field using attributes from the record metadata or a configured static value.
MaskField Mask specified fields with a valid null value for the field type.
MessageTimeStampRouter Update the record’s topic field as a function of the original topic value and the record’s timestamp field.
RegexRouter Not currently available for managed connectors. Update the record topic using the configured regular expression and replacement string.
ReplaceField Filter or rename fields.
SetSchemaMetadata Set the schema name, version, or both on the record’s key or value schema.
TimestampConverter Convert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types.
TimestampRouter Update the record’s topic field as a function of the original topic value and the record timestamp.
TombstoneHandler Manage tombstone records. A tombstone record is defined as a record with the entire value field being null, whether or not it has ValueSchema.
ValueToKey Replace the record key with a new key formed from a subset of fields in the record value.

SMT examples

The following examples show connector configuration and transformation examples for source and sink connectors. The examples show configurations for the Datagen Source and the Google Cloud Functions Sink connectors.

Note

  • Several SMTs (for example, Cast) may be applied to either the record Value or the record Key. Using Cast as the example, the connector applies org.apache.kafka.connect.transforms.Cast$Value to the Kafka record value. It applies org.apache.kafka.connect.transforms.Cast$Key to the Kafka record key.
  • See the SMT documentation for SMT definitions, configuration property values, and additional examples.

Cast

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-cast",
  "max.interval": "3000",
  "name": "DatagenSourceSmtCast",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "castValues",
  "transforms.castValues.type": "org.apache.kafka.connect.transforms.Cast$Value",
  "transforms.castValues.spec": "zipcode:float64, orderid:string, orderunits:int32"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Drop

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-drop",
  "max.interval": "3000",
  "name": "DatagenSourceSmtDrop",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "dropValue",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Extract Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-extract-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtExtractField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "extractAddress",
  "transforms.extractAddress.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
  "transforms.extractAddress.field": "address"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Extract Topic

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-extract-topic",
  "max.interval": "3000",
  "name": "DatagenSourceSmtExtractTopic",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "setTopic",
  "transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
  "transforms.setTopic.field": "itemid"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Filter (Apache Kafka)

Note

Shows additional Tombstone predicate.

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-filter-ak",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFilterAk",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "predicates": "isNull",
  "predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
  "transforms": "dropValue, filterNull",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
  "transforms.filterNull.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterNull.predicate": "isNull"
}

Transformation example:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Filter (Confluent)

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-filter-cp",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFilterCp",
  "output.data.format": "JSON",
  "quickstart": "RATINGS",
  "tasks.max": "1",
  "transforms": "filterValue",
  "transforms.filterValue.filter.condition": "$[?(@.channel == 'ios')]",
  "transforms.filterValue.filter.type": "include",
  "transforms.filterValue.type": "io.confluent.connect.transforms.Filter$Value"
}

Transformation example:

[
  {
    "rating_id": 140,
    "user_id": 3,
    "stars": 4,
    "route_id": 7425,
    "rating_time": 1669,
    "channel": "ios",
    "message": "thank you for the most friendly, helpful experience today at your new lounge"
  },
  {
    "rating_id": 491,
    "user_id": 7,
    "stars": 4,
    "route_id": 3302,
    "rating_time": 5881,
    "channel": "iOS-test",
    "message": "more peanuts please"
  }
]

Flatten

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-flatten",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFlatten",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "flatten",
  "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.flatten.delimiter": "_"
}

Transformation example:

{
  "ordertime": 1491310657544,
  "orderid": 9,
  "itemid": "Item_826",
  "orderunits": 4.188698361592631,
  "address": {
    "city": "City_73",
    "state": "State_47",
    "zipcode": 54450
  }
}

Hoist Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-hoist-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtHoistField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "hoist",
  "transforms.hoist.type": "org.apache.kafka.connect.transforms.HoistField$Value",
  "transforms.hoist.field": "wrapperField"
}

Transformation example:

{
  "registertime": 1506959883575,
  "userid": "User_2",
  "regionid": "Region_1",
  "gender": "MALE"
}

Insert Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-insert-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtInsertField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "insert",
  "transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.insert.partition.field": "PartitionField",
  "transforms.insert.static.field": "InsertedStaticField",
  "transforms.insert.static.value": "SomeValue",
  "transforms.insert.timestamp.field": "TimestampField",
  "transforms.insert.topic.field": "TopicField"
}

Transformation example:

{
  "registertime": 1506959883575,
  "userid": "User_2",
  "regionid": "Region_1",
  "gender": "MALE"
}

Mask Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-mask-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtMaskField",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "gender",
  "transforms.mask.replacement": "REDACTED"
}

Transformation example:

{
  "registertime": 1499746213074,
  "userid": "User_5",
  "regionid": "Region_4",
  "gender": "MALE"
}

Message Timestamp Router

Configuration examples:

Not currently available

Transformation example:

{
  "key": "User_8",
  "value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
  "topic": "msg_timestamp_router_topic",
  "partition": 0,
  "offset": 812925,
  "timestamp": 1628486671963
}

Replace Field

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-replace-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtReplaceField",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "replacefield",
  "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.replacefield.exclude": "userid",
  "transforms.replacefield.include": "regionid",
  "transforms.replacefield.renames": "regionid:Region_Id"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "userid": 1234,
    "regionid": "Region_3",
    "ModifiedBy": "XYZ"
  }
}

Set Schema Metadata

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-set-schema-metadata",
  "max.interval": "3000",
  "name": "DatagenSourceSmtSetSchemaMetadata",
  "output.data.format": "AVRO",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "setSchemaMetadata",
  "transforms.setSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
  "transforms.setSchemaMetadata.schema.name": "schema_name",
  "transforms.setSchemaMetadata.schema.version": "12"
}

Transformation example:

{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}

Timestamp Converter

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-ts-converter",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTimestampConverter",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "tsConverter",
  "transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.tsConverter.field": "ordertime",
  "transforms.tsConverter.format": "yyyy-MM-dd",
  "transforms.tsConverter.target.type": "string"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "ordertime": "2021-08-04",
    "orderid": "ABC",
    "itemid": "XYZ"
  }
}

Timestamp Router

Configuration examples:

Not currently available

Transformation example:

{
  "key": "User_8",
  "value": "{registertime=1491516816009, gender=FEMALE, regionid=Region_4, userid=User_8}",
  "topic": "topic"
}

Tombstone Handler

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-tombstone-handler",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTombstoneHandler",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "dropValue,tombstoneFail",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
  "transforms.tombstoneFail.type": "io.confluent.connect.transforms.TombstoneHandler",
  "transforms.tombstoneFail.behavior": "fail"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": null
}

Value To Key

Configuration examples:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-value-to-key",
  "max.interval": "3000",
  "name": "DatagenSourceSmtValueToKey",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "valueToKey",
  "transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.valueToKey.fields": "registertime, userid"
}

Transformation example:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "userid": 1234,
    "registertime": "ABC",
    "ModifiedBy": "XYZ"
  }
}