SMT Examples

This section includes Configure Single Message Transforms for Kafka Connectors in Confluent Cloud examples for the managed Oracle CDC Connector for Confluent Cloud.

TimestampConverter SMT

Suppose you have the following table:

CREATE TABLE CUSTOMERS (
  ID NUMBER(10) NOT NULL PRIMARY KEY,
  FIRST_NAME VARCHAR(50),
  CLUB_STATUS VARCHAR(8),
  COMMENTS VARCHAR(90),
  CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

If you run a connector, you may see a record similar to the following:

{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"long":1636048731469},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"589654"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636049137001"},"row_id":null,"username":null}

The timestamp is represented in "CREATE_TS":{"long":1636048731469}, but sometimes you want to keep the timestamp that is compatible with java.text.SimpleDateFormat. For this situation, you can use TimestampConverter SMT as shown in the following example:

"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"transforms.TimestampConverter.field": "CREATE_TS",
"transforms.TimestampConverter.target.type": "string"

TimestampConverter SMT converts "CREATE_TS":{"long":1636048731469} to "CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"}:

{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"594158"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636050925327"},"row_id":null,"username":null}

InsertField SMT

To insert a field to specify the origin of records, add the InsertField configuration property to the connector configuration as shown in the following example:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database"

After the connector is up and running, if changes (for example, INSERT, UPDATE, or DELETE) happen in tables you are watching , you will see "Origin":{"string":"Oracle Database"} is added to a table-specific topic as shown in the following example:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636058280541},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"611987"},"op_type":{"string":"I"},"op_ts":{"string":"1636083480000"},"current_ts":{"string":"1636058282236"},"row_id":{"string":"AAAFhxAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}

If you look at the redo log topic, you will also see "Origin":{"string":"Oracle Database"} being added, and this is probably not what you want:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":611987},"START_SCN":{"long":611987},"COMMIT_SCN":{"long":611988},"TIMESTAMP":{"long":1636083480000},"START_TIMESTAMP":{"long":1636083480000},"COMMIT_TIMESTAMP":{"long":1636083480000},"XIDUSN":{"long":3},"XIDSLT":{"long":33},"XIDSQN":{"long":407},"XID":{"bytes":"\u0003\u0000!\u0000—\u0001\u0000\u0000"},"PXIDUSN":{"long":3},"PXIDSLT":{"long":33},"PXIDSQN":{"long":407},"PXID":{"bytes":"\u0003\u0000!\u0000—\u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhxAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":55},"RBABLK":{"long":13},"RBABYTE":{"long":208},"UBAFIL":{"long":3},"UBABLK":{"long":169},"UBAREC":{"long":19},"UBASQN":{"long":74},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22641},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22641},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 13:38:00.541'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 13:38:00.541') and ROWID = 'AAAFhxAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x000037.0000000d.00d0 "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":22},"UNDO_VALUE":{"long":23},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":611988},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false},"Origin":{"string":"Oracle Database"}}

You can leverage KIP-585: Filter and Conditional SMTs to apply SMTs to certain topics only.

"transforms": "InsertField",
"transforms.InsertField.predicate":"has-my-prefix",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database",
"predicates":"has-my-prefix",
"predicates.has-my-prefix.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.has-my-prefix.pattern":"ORCL.ADMIN.*"

After the connector is up and running, if changes happen to tables of your interest (for example, INSERT, UPDATE, or DELETE) , you will see "Origin":{"string":"Oracle Database"} being added to a table-specific topic:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636060255772},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"617372"},"op_type":{"string":"I"},"op_ts":{"string":"1636085455000"},"current_ts":{"string":"1636060258267"},"row_id":{"string":"AAAFhzAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}

It is important to note that "Origin":{"string":"Oracle Database"} won’t show up in the redo log topic:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":617372},"START_SCN":{"long":617371},"COMMIT_SCN":{"long":617373},"TIMESTAMP":{"long":1636085455000},"START_TIMESTAMP":{"long":1636085455000},"COMMIT_TIMESTAMP":{"long":1636085455000},"XIDUSN":{"long":9},"XIDSLT":{"long":31},"XIDSQN":{"long":416},"XID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"PXIDUSN":{"long":9},"PXIDSLT":{"long":31},"PXIDSQN":{"long":416},"PXID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhzAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":61},"RBABLK":{"long":231},"RBABYTE":{"long":140},"UBAFIL":{"long":3},"UBABLK":{"long":2952},"UBAREC":{"long":12},"UBASQN":{"long":70},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22643},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22643},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 14:10:55.772'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 14:10:55.772') and ROWID = 'AAAFhzAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x00003d.000000e7.008c "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":16},"UNDO_VALUE":{"long":17},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":617373},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false}}