SMT Examples

This section includes Single Message Transforms for Managed Connectors examples for the managed Oracle CDC Connector for Confluent Cloud.

TimestampConverter SMT

Suppose you have the following table:

CREATE TABLE CUSTOMERS (
  ID NUMBER(10) NOT NULL PRIMARY KEY,
  FIRST_NAME VARCHAR(50),
  CLUB_STATUS VARCHAR(8),
  COMMENTS VARCHAR(90),
  CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

If you run a connector, you may see a record similar to the following:

{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"long":1636048731469},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"589654"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636049137001"},"row_id":null,"username":null}

The timestamp is represented in "CREATE_TS":{"long":1636048731469}, but sometimes you want to keep the timestamp that is compatible with java.text.SimpleDateFormat. For this situation, you can use TimestampConverter SMT as shown in the following example:

"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"transforms.TimestampConverter.field": "CREATE_TS",
"transforms.TimestampConverter.target.type": "string"

TimestampConverter SMT converts "CREATE_TS":{"long":1636048731469} to "CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"}:

{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"594158"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636050925327"},"row_id":null,"username":null}

InsertField SMT

To insert a field to specify the origin of records, add the InsertField configuration property to the connector configuration as shown in the following example:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database"

After the connector is up and running, if changes (for example, INSERT, UPDATE, or DELETE) happen in tables you are watching , you will see "Origin":{"string":"Oracle Database"} is added to a table-specific topic as shown in the following example:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636058280541},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"611987"},"op_type":{"string":"I"},"op_ts":{"string":"1636083480000"},"current_ts":{"string":"1636058282236"},"row_id":{"string":"AAAFhxAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}

If you look at the redo log topic, you will also see "Origin":{"string":"Oracle Database"} being added, and this is probably not what you want:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":611987},"START_SCN":{"long":611987},"COMMIT_SCN":{"long":611988},"TIMESTAMP":{"long":1636083480000},"START_TIMESTAMP":{"long":1636083480000},"COMMIT_TIMESTAMP":{"long":1636083480000},"XIDUSN":{"long":3},"XIDSLT":{"long":33},"XIDSQN":{"long":407},"XID":{"bytes":"\u0003\u0000!\u0000—\u0001\u0000\u0000"},"PXIDUSN":{"long":3},"PXIDSLT":{"long":33},"PXIDSQN":{"long":407},"PXID":{"bytes":"\u0003\u0000!\u0000—\u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhxAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":55},"RBABLK":{"long":13},"RBABYTE":{"long":208},"UBAFIL":{"long":3},"UBABLK":{"long":169},"UBAREC":{"long":19},"UBASQN":{"long":74},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22641},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22641},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 13:38:00.541'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 13:38:00.541') and ROWID = 'AAAFhxAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x000037.0000000d.00d0 "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":22},"UNDO_VALUE":{"long":23},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":611988},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false},"Origin":{"string":"Oracle Database"}}

You can leverage KIP-585: Filter and Conditional SMTs to apply SMTs to certain topics only.

"transforms": "InsertField",
"transforms.InsertField.predicate":"has-my-prefix",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database",
"predicates":"has-my-prefix",
"predicates.has-my-prefix.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.has-my-prefix.pattern":"ORCL.ADMIN.*"

After the connector is up and running, if changes happen to tables of your interest (for example, INSERT, UPDATE, or DELETE) , you will see "Origin":{"string":"Oracle Database"} being added to a table-specific topic:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636060255772},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"617372"},"op_type":{"string":"I"},"op_ts":{"string":"1636085455000"},"current_ts":{"string":"1636060258267"},"row_id":{"string":"AAAFhzAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}

It is important to note that "Origin":{"string":"Oracle Database"} won’t show up in the redo log topic:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":617372},"START_SCN":{"long":617371},"COMMIT_SCN":{"long":617373},"TIMESTAMP":{"long":1636085455000},"START_TIMESTAMP":{"long":1636085455000},"COMMIT_TIMESTAMP":{"long":1636085455000},"XIDUSN":{"long":9},"XIDSLT":{"long":31},"XIDSQN":{"long":416},"XID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"PXIDUSN":{"long":9},"PXIDSLT":{"long":31},"PXIDSQN":{"long":416},"PXID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhzAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":61},"RBABLK":{"long":231},"RBABYTE":{"long":140},"UBAFIL":{"long":3},"UBABLK":{"long":2952},"UBAREC":{"long":12},"UBASQN":{"long":70},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22643},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22643},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 14:10:55.772'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 14:10:55.772') and ROWID = 'AAAFhzAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x00003d.000000e7.008c "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":16},"UNDO_VALUE":{"long":17},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":617373},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false}}