Configure Single Message Transforms for Kafka Connectors in Confluent Cloud¶
Confluent Cloud fully-managed connectors can be configured with one or more single message transforms (SMTs) to make simple and lightweight modifications to message values, keys, and headers. This can be convenient for inserting fields, masking information, event routing, and other minor data adjustments. SMTs can be used with sink and source connectors.
When multiple SMTs are used with a source connector, Connect passes each source record produced by the connector through the first SMT, which makes its modifications. The updated source record is then passed to the next SMT in the chain. This sequence continues for any additional SMTs in the chain. The final updated source record is converted to binary format (bytes) and written to Apache Kafka®.
When multiple SMTs are used with a sink connector, Connect first reads the record from Kafka and converts the bytes to a sink record. Connect then passes the record through the SMT, which makes its modifications. The updated sink record is then passed to the next SMT in the chain. This continues for the remaining SMTs. The final updated sink record is then passed to the sink connector for processing.
Note the following additional details when using this feature:
- SMTs are very useful, but should only be used for simple data transformations. More sophisticated transformations and data integrations should be handled using Flink SQL.
- There is currently a limit of 10 SMT instances per connector.
- See SMT Limitations for additional information
How to use SMTs with the Cloud Console¶
The following steps show how to set up an SMT using the Confluent Cloud Console. The steps use the Datagen Source Connector for Confluent Cloud.
Note
The connector applies transforms in the order in which they are created.
Add transforms¶
Go to the advanced configurations section of the connector UI and click Add SMT.
Accept the default transform name or enter a new name in the Transforms name field. For example, if you are masking a customer ID using the transform
MaskField$Value
you could enter the namemask_userid
.After you select the transform, additional property fields are displayed. The following properties are displayed for
MaskField$Value
:- fields (required): Enter the name of the record fields you want to mask. In the example, the field
userid
will be masked. - replacement (optional) This field allows you to set a replacement string. In the example, the replacement for the
userid
will show******
.
Note
SMT configuration properties vary depending on the selected transform type. For transform configuration properties and descriptions, see the SMT docs for the transform you want to apply.
- fields (required): Enter the name of the record fields you want to mask. In the example, the field
After you have completed the first transform, you can click Add new SMT to add another SMT. If you are done adding SMTs, and want to add a predicate to the SMT, click Add Predicate. For details about adding predicates, see Add, link, and delete predicates.
When you are done adding SMTs, click Continue and complete any remaining connector startup tasks.
(Optional) Click Request preview and verify that the data output in the
"record"
section is what you want. The preview will take a few minutes to generate. If the data preview shows what you want, launch the connector.
Add, link, and delete predicates¶
When you apply a predicate to a transform, the predicate instructs the connector to apply the transform conditionally. In the predicate, you specify the predicate condition that the connector uses to evaluate each record that it processes.
The following predicate condition types are provided:
- HasHeaderKey
- RecordIsTombstone
- TopicNameMatches
When a record is evaluated, the connector first checks the record against the predicate condition. If the condition is true for the record, the connector applies the transform to the record. Records that do not match the predicate are passed through unmodified. The Negate Predicate option inverts the predicate. Setting this option to True applies the predicate only to records that do not match the defined predicate condition.
In the example shown below, the predicate TopicNameMatches is selected. The
regular expression (regex) pattern used is my-prefix-.*
. Negate
Predicate is set to False (the default). This predicate configuration
causes the connector to only apply the transform to records containing
my-prefix-
.
When you create a predicate, the predicate is linked to the first transform it was created from. You can link additional SMTs to use the same predicate. When you delete a predicate, you have the option to keep transforms linked to the predicate, or delete the predicate and any linked transforms.
For additional configuration examples showing how to use predicates, see Predicate Examples.
How to use SMTs with the Confluent CLI¶
Refer to the following docs for creating the JSON configuration file to load using the CLI:
- The managed connector docs provide the configuration properties and a JSON file example for each connector.
- The SMT docs provide the transformation configuration properties that you need to add to the basic connector configuration.
- Refer to the SMT examples for configuration and transformation examples.
- For examples showing how to use predicates, see Predicate Examples.
Failed records¶
For source connectors, if an SMT is misconfigured the connector fails. Note the following troubleshooting information:
- You can check the fully-managed connector data output preview. This shows the full stack trace for a failed record. See SMTs and Data Preview for what to look for when checking the output preview.
- You can view connector events to identify the root cause for an exception.
For sink connectors, the connector continues running. Failed records are sent to the View Connector Dead Letter Queue Errors in Confluent Cloud. The DLQ shows the full stack trace for a failed record.
Note
See Insert Field transformation for additional information.
SMTs and Data Preview¶
For SMTs, data preview creates one record corresponding to one transformation applied to a Kafka record. For multiple transformations, each record represents one sequential transformation applied to the Kafka record. For this reason, if you configure n transforms, data preview creates n + 1 records. When you preview transformations, be sure to keep this in mind.
In the following example, a source connector is configured with two SMTs. Note
that the transformation steps in the figures are represented by
current_step:n
out of total_step:n
. The entries transformation_name
and transformation_type
show what transformation is being applied at the
step.
The first record is the actual source record received by the connector. No transformations are applied yet.
The first transformation (
ValueToKey
) is applied to the Kafka record. The Kafka record key is set as thegender
field value.The second transformation (
MaskField$Value
) is applied to the Kafka record. The Kafka record value is transformed and thegender
field value is removed.
SMT Limitations¶
Note the following limitations.
Insert Field transformation¶
Note the following when configuring the InsertField SMT:
- The transformation is ignored if the initial value of key/value is null.
Transform alias validation¶
Several fully-managed connectors already have internal transformations. If you add a transformation alias that conflicts with the alias present in the connector’s internal configuration template, Connect throws a validation error similar to the following example:
Invalid value [internalxform, ..., internalxform] for configuration transforms: Duplicate alias provided.
In the error message above, internalxform
is the internal alias present in
the configuration template that conflicts with the added alias. You can’t use
the following aliases in your transformation.
Managed Connector | Connector plugin | Internal alias |
---|---|---|
Amazon S3 Sink | S3_SINK | requireTimestampTransform |
Google BigQuery Sink | BigQuerySink | requireMapTransform |
HTTP Sink | HttpSink | requireTimestampTransform |
Microsoft SQL Server CDC Source (Debezium) [Legacy] | SqlServerCdcSource | unwrap |
Microsoft SQL Server CDC Source V2 (Debezium) | SqlServerCdcSourceV2 | unwrap |
MySQL CDC Source (Debezium) [Legacy] | MySqlCdcSource | unwrap |
MySQL CDC Source V2 (Debezium) | MySqlCdcSourceV2 | unwrap |
Postgres CDC Source (Debezium) [Legacy] | PostgresCdcSource | unwrap |
Postgres CDC Source V2 (Debezium) | PostgresCdcSourceV2 | unwrap |
Debezium transformations¶
ExtractNewRecordState and Kafka Connect EventRouter (Debezium) SMT Usage Reference for Confluent Cloud SMTs are available for the following connectors:
- Microsoft SQL Server CDC Source (Debezium) [Legacy]
- Microsoft SQL Server CDC Source V2 (Debezium)
- MySQL CDC Source (Debezium) [Legacy]
- MySQL CDC Source V2 (Debezium)
- Postgres CDC Source (Debezium) [Legacy]
- Postgres CDC Source V2 (Debezium)
Note the following when configuring the ExtractNewRecordState SMT:
- You must set the configuration property
after.state.only
tofalse
. - Only after setting
after.state.only
tofalse
in the connector UI, will you be able to set the transform alias tounwrap
.
Unsupported transformations¶
- The following SMTs are not currently supported for fully-managed connectors:
- RegexRouter: For source connectors, the alternative is to use the TopicRegexRouter SMT.
- InsertHeader
- DropHeaders
- ByLogicalTableRouter
- Certain sink connectors do not support the following transformations:
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Header
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
io.confluent.connect.cloud.transforms.TopicRegexRouter
- Certain source connectors do not support the following transformations:
org.apache.kafka.connect.transforms.ValueToKey
org.apache.kafka.connect.transforms.HoistField$Value
SMT list¶
Note
Not all of the listed SMTs are supported by all fully-managed connectors. See Unsupported transformations.
Transform | Description |
---|---|
Cast | Cast fields or the entire key or value to a specific type (for example, to force an integer field to a smaller width). |
Drop | Drop either a key or a value from a record and set it to null. |
DropHeaders | Not currently available for managed connectors. Drop one or more headers from each record. |
EventRouter | Only available for managed Debezium connectors. Route Debezium outbox events using a regex configuration option. |
ExtractField | Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. Any null values are passed through unmodified. |
ExtractTopic | Replace the record topic with a new topic derived from its key or value. |
Filter (Apache Kafka) | Drop all records. Designed to be used in conjunction with a Predicate. |
Filter (Confluent) | Include or drop records that match a configurable filter.condition . |
Flatten | Flatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character. |
GzipDecompress | Not currently available for managed connectors. Gzip-decompress the entire byteArray key or value input. |
HeaderFrom | Not currently available for managed connectors. Moves or copies fields in a record key or value into the record’s header. |
HoistField | Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data. |
InsertField | Insert field using attributes from the record metadata or a configured static value. |
InsertHeader | Not currently available for managed connectors. Insert a literal value as a record header. |
MaskField | Mask specified fields with a valid null value for the field type. |
MessageTimeStampRouter | Update the record’s topic field as a function of the original topic value and the record’s timestamp field. |
RegexRouter | Not currently available for managed connectors. Update the record topic using the configured regular expression and replacement string. |
ReplaceField (Apache Kafka) | Filter or rename fields. |
ReplaceField (Confluent) | Filter or rename nested fields. |
SetSchemaMetadata | Set the schema name, version, or both on the record’s key or value schema. |
TimestampConverter | Convert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types. |
TimestampRouter | Update the record’s topic field as a function of the original topic value and the record timestamp. |
TombstoneHandler | Manage tombstone records. A tombstone record is defined as a record with the entire value field being null, whether or not it has ValueSchema. |
TopicRegexRouter | Only available for managed Source connectors. Update the record topic using the configured regular expression and replacement string. |
ValueToKey | Replace the record key with a new key formed from a subset of fields in the record value. |
SMT examples¶
The following examples show connector configuration and transformation examples for source and sink connectors. The examples show configurations for the Datagen Source and the Google Cloud Functions Sink connectors. See the SMT documentation for SMT definitions, configuration property values, and additional examples.
Note that several SMTs (for example, Cast
) may be applied to either the
record Value or the record Key. Using Cast
as the example, the connector
applies org.apache.kafka.connect.transforms.Cast$Value
to the Kafka record
value. It applies org.apache.kafka.connect.transforms.Cast$Key
to the Kafka
record key.
Cast¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-cast",
"max.interval": "3000",
"name": "DatagenSourceSmtCast",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "castValues",
"transforms.castValues.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castValues.spec": "zipcode:float64, orderid:string, orderunits:int32"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "${GCP_FUNCTION_NAME}",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtCast",
"project.id": "${PROJECT_ID}",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "castValues",
"transforms.castValues.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castValues.spec": "zipcode:float64, orderid:string, orderunits:int32"
}
Transformation example:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
{
"ordertime": 1512446289869,
"orderid": "417794",
"itemid": "Item_430",
"orderunits": 5,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688.00
}
}
Drop¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-drop",
"max.interval": "3000",
"name": "DatagenSourceSmtDrop",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "dropValue",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtDrop",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "dropValue",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value"
}
Transformation example:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
null
Extract Field¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-extract-field",
"max.interval": "3000",
"name": "DatagenSourceSmtExtractField",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "extractAddress",
"transforms.extractAddress.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractAddress.field": "address"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtExtractField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "extractAddress",
"transforms.extractAddress.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractAddress.field": "address"
}
Transformation example:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
{
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
Extract Topic¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-extract-topic",
"max.interval": "3000",
"name": "DatagenSourceSmtExtractTopic",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "setTopic",
"transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.setTopic.field": "itemid"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtExtractTopic",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "setTopic",
"transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.setTopic.field": "itemid"
}
Transformation example:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
Goes to topic Item_430
instead of the configured topic.
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
Filter (Apache Kafka)¶
Note
Shows additional Tombstone predicate.
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-filter-ak",
"max.interval": "3000",
"name": "DatagenSourceSmtFilterAk",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"predicates": "isNull",
"predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms": "dropValue, filterNull",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.filterNull.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterNull.predicate": "isNull"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtFilterAk",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"predicates": "isNull",
"predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms": "dropValue, filterNull",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.filterNull.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterNull.predicate": "isNull"
}
Transformation example:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
Record is dropped.
Filter (Confluent)¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-filter-cp",
"max.interval": "3000",
"name": "DatagenSourceSmtFilterCp",
"output.data.format": "JSON",
"quickstart": "RATINGS",
"tasks.max": "1",
"transforms": "filterValue",
"transforms.filterValue.filter.condition": "$[?(@.channel == 'ios')]",
"transforms.filterValue.filter.type": "include",
"transforms.filterValue.type": "io.confluent.connect.transforms.Filter$Value"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtFilterCp",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "filterValue",
"transforms.filterValue.filter.condition": "$[?(@.channel == 'ios')]",
"transforms.filterValue.filter.type": "include",
"transforms.filterValue.type": "io.confluent.connect.transforms.Filter$Value"
}
Transformation example:
[
{
"rating_id": 140,
"user_id": 3,
"stars": 4,
"route_id": 7425,
"rating_time": 1669,
"channel": "ios",
"message": "thank you for the most friendly, helpful experience today at your new lounge"
},
{
"rating_id": 491,
"user_id": 7,
"stars": 4,
"route_id": 3302,
"rating_time": 5881,
"channel": "iOS-test",
"message": "more peanuts please"
}
]
[
{
"rating_id": 140,
"user_id": 3,
"stars": 4,
"route_id": 7425,
"rating_time": 1669,
"channel": "ios",
"message": "thank you for the most friendly, helpful experience today at your new lounge"
}
]
Flatten¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-flatten",
"max.interval": "3000",
"name": "DatagenSourceSmtFlatten",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtFlatten",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
Transformation example:
{
"ordertime": 1491310657544,
"orderid": 9,
"itemid": "Item_826",
"orderunits": 4.188698361592631,
"address": {
"city": "City_73",
"state": "State_47",
"zipcode": 54450
}
}
{
"ordertime": 1491310657544,
"orderid": 9,
"itemid": "Item_826",
"orderunits": 4.188698361592631,
"address_city": "City_73",
"address_state": "State_47",
"address_zipcode": 54450
}
Hoist Field¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-hoist-field",
"max.interval": "3000",
"name": "DatagenSourceSmtHoistField",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "hoist",
"transforms.hoist.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.hoist.field": "wrapperField"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtHoistField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "hoist",
"transforms.hoist.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.hoist.field": "wrapperField"
}
Transformation example:
{
"registertime": 1506959883575,
"userid": "User_2",
"regionid": "Region_1",
"gender": "MALE"
}
{
"wrapperField": {
"registertime": 1506959883575,
"userid": "User_2",
"regionid": "Region_1",
"gender": "MALE"
}
}
Insert Field¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-insert-field",
"max.interval": "3000",
"name": "DatagenSourceSmtInsertField",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "insert",
"transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insert.partition.field": "PartitionField",
"transforms.insert.static.field": "InsertedStaticField",
"transforms.insert.static.value": "SomeValue",
"transforms.insert.timestamp.field": "TimestampField",
"transforms.insert.topic.field": "TopicField"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtInsertField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "insert",
"transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insert.offset.field": "offsetField",
"transforms.insert.partition.field": "partitionField",
"transforms.insert.static.field": "staticField",
"transforms.insert.static.value": "staticValue",
"transforms.insert.timestamp.field": "timestampField",
"transforms.insert.topic.field": "topicField"
}
Transformation example:
{
"registertime": 1506959883575,
"userid": "User_2",
"regionid": "Region_1",
"gender": "MALE"
}
{
"registertime": 1506959883575,
"userid": "User_2",
"regionid": "Region_1",
"gender": "MALE",
"TopicField": "insert_topic",
"PartitionField": null,
"TimestampField": null,
"InsertedStaticField": "SomeValue"
}
Mask Field¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-mask-field",
"max.interval": "3000",
"name": "DatagenSourceSmtMaskField",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "gender",
"transforms.mask.replacement": "REDACTED"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtMaskField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-users-json",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "gender",
"transforms.mask.replacement": "REDACTED"
}
Transformation example:
{
"registertime": 1499746213074,
"userid": "User_5",
"regionid": "Region_4",
"gender": "MALE"
}
{
"registertime": 1499746213074,
"userid": "User_5",
"regionid": "Region_4",
"gender": "REDACTED"
}
Message Timestamp Router¶
Configuration examples:
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtMessageTimestampRouter",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "tsConverter,tsRouter",
"transforms.tsConverter.field": "ordertime",
"transforms.tsConverter.format": "yyyy-MM-dd",
"transforms.tsConverter.target.type": "string",
"transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsRouter.type": "io.confluent.connect.transforms.MessageTimestampRouter",
"transforms.tsRouter.message.timestamp.keys": "ordertime"
}
Transformation example:
{
"key": "User_8",
"value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
"topic": "msg_timestamp_router_topic",
"partition": 0,
"offset": 812925,
"timestamp": 1628486671963
}
{
"key": "User_8",
"value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
"topic": "msg_timestamp_router_topic-2021.09.03",
"partition": 0,
"offset": 812925,
"timestamp": 1628486671963
}
Replace Field (Apache Kafka)¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-replace-field",
"max.interval": "3000",
"name": "DatagenSourceSmtReplaceField",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "replacefield",
"transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.replacefield.exclude": "userid",
"transforms.replacefield.include": "regionid",
"transforms.replacefield.renames": "regionid:Region_Id"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtReplaceField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "replaceField",
"transforms.replaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.replaceField.include": "regionid"
}
Transformation example:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"userid": 1234,
"regionid": "Region_3",
"ModifiedBy": "XYZ"
}
}
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"Region_id": "Region_3"
}
}
Replace Field (Confluent)¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-replace-field",
"max.interval": "3000",
"name": "DatagenSourceSmtReplaceField",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "replacefield",
"transforms.replacefield.type": "io.confluent.connect.transforms.ReplaceField$Value",
"transforms.replacefield.exclude": "userid",
"transforms.replacefield.include": "regionid",
"transforms.replacefield.renames": "regionid:Region_Id"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtReplaceField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "replaceField",
"transforms.replaceField.type": "io.confluent.connect.transforms.ReplaceField$Value",
"transforms.replaceField.include": "regionid"
}
Transformation example:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"userid": 1234,
"regionid": "Region_3",
"ModifiedBy": "XYZ"
}
}
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"Region_id": "Region_3"
}
}
Set Schema Metadata¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-set-schema-metadata",
"max.interval": "3000",
"name": "DatagenSourceSmtSetSchemaMetadata",
"output.data.format": "AVRO",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "setSchemaMetadata",
"transforms.setSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.setSchemaMetadata.schema.name": "schema_name",
"transforms.setSchemaMetadata.schema.version": "12"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "AVRO",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtSetSchemaMetadata",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "setSchemaMetadata",
"transforms.setSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.setSchemaMetadata.schema.name": "schema_name",
"transforms.setSchemaMetadata.schema.version": "12"
}
Transformation example:
{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "users",
"namespace": "ksql",
"type": "record"
}
{
"connect.name": "schema_name",
"connect.version": 12,
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "schema_name",
"type": "record"
}
Timestamp Converter¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-ts-converter",
"max.interval": "3000",
"name": "DatagenSourceSmtTimestampConverter",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "tsConverter",
"transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsConverter.field": "ordertime",
"transforms.tsConverter.format": "yyyy-MM-dd",
"transforms.tsConverter.target.type": "string"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDENTIALS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtTimestampConverter",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "tsConverter",
"transforms.tsConverter.field": "ordertime",
"transforms.tsConverter.format": "yyyy-MM-dd",
"transforms.tsConverter.target.type": "string",
"transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
}
Transformation example:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"ordertime": 1628035200000,
"orderid": "ABC",
"itemid": "XYZ"
}
}
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"ordertime": "2021-08-04",
"orderid": "ABC",
"itemid": "XYZ"
}
}
Timestamp Router¶
Configuration examples:
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtTimestampRouter",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-users-json",
"transforms": "tsRouter",
"transforms.tsRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.tsRouter.timestamp.format": "YYYYMM",
"transforms.tsRouter.topic.format": "foo-${topic}-${timestamp}"
}
Transformation example:
{
"key": "User_8",
"value": "{registertime=1491516816009, gender=FEMALE, regionid=Region_4, userid=User_8}",
"topic": "topic"
}
{
"key": "User_8",
"value": "{registertime=1491516816009, gender=FEMALE, regionid=Region_4, userid=User_8}",
"topic": "foo-topic-202108"
}
Tombstone Handler¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-tombstone-handler",
"max.interval": "3000",
"name": "DatagenSourceSmtTombstoneHandler",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "dropValue,tombstoneFail",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.tombstoneFail.type": "io.confluent.connect.transforms.TombstoneHandler",
"transforms.tombstoneFail.behavior": "fail"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtTombstoneHandler",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "dropValue,tombstoneFail",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.tombstoneFail.type": "io.confluent.connect.transforms.TombstoneHandler",
"transforms.tombstoneFail.behavior": "fail"
}
Transformation example:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": null
}
The connector should either ignore (log) or fail.
Topic Regex Router¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-users-json",
"max.interval": "3000",
"name": "DatagenSourceSmtTopicRegexRouter",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "addPrefixRegex",
"transforms.addPrefixRegex.type": "io.confluent.connect.cloud.transforms.TopicRegexRouter",
"transforms.addPrefixRegex.regex": ".*",
"transforms.addPrefixRegex.replacement": "prefix_$0"
}
Transformation example:
{
"key": "User_8",
"value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
"topic": "datagen-source-users-json",
"partition": 0,
"offset": 812925,
"timestamp": 1628486671963
}
{
"key": "User_8",
"value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
"topic": "prefix_datagen-source-users-json",
"partition": 0,
"offset": 812925,
"timestamp": 1628486671963
}
Value To Key¶
Configuration examples:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-value-to-key",
"max.interval": "3000",
"name": "DatagenSourceSmtValueToKey",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "valueToKey",
"transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.valueToKey.fields": "registertime, userid"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtValueToKey",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-users-json",
"transforms": "valueToKey",
"transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.valueToKey.fields": "registertime, userid"
}
Transformation example:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"userid": 1234,
"registertime": "ABC",
"ModifiedBy": "XYZ"
}
}
{
"topic": "TestTopic",
"key": {
"userid": 1234,
"registertime": "ABC",
},
"value": {
"userid": 1234,
"registertime": "ABC",
"ModifiedBy": "XYZ"
}
}