SMT Examples for Oracle CDC Source Connector for Confluent Cloud¶
This section includes Configure Single Message Transforms for Kafka Connectors in Confluent Cloud examples for the fully-managed Oracle CDC Connector for Confluent Cloud.
TimestampConverter SMT¶
Suppose you have the following table:
CREATE TABLE CUSTOMERS (
ID NUMBER(10) NOT NULL PRIMARY KEY,
FIRST_NAME VARCHAR(50),
CLUB_STATUS VARCHAR(8),
COMMENTS VARCHAR(90),
CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
If you run a connector, you may see a record similar to the following:
{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"long":1636048731469},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"589654"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636049137001"},"row_id":null,"username":null}
The timestamp is represented in "CREATE_TS":{"long":1636048731469}
, but
sometimes you want to keep the timestamp that is compatible with
java.text.SimpleDateFormat
. For this situation, you can use TimestampConverter SMT
as shown in the following example:
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"transforms.TimestampConverter.field": "CREATE_TS",
"transforms.TimestampConverter.target.type": "string"
TimestampConverter SMT
converts "CREATE_TS":{"long":1636048731469}
to
"CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"}
:
{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"594158"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636050925327"},"row_id":null,"username":null}
InsertField SMT¶
To insert a field to specify the origin of records, add the InsertField
configuration property to the connector configuration as shown in the following
example:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database"
After the connector is up and running, if changes (for example, INSERT
, UPDATE
, or DELETE
) happen in tables you are watching
, you will see
"Origin":{"string":"Oracle Database"}
is added to a table-specific topic
as shown in the following example:
kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636058280541},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"611987"},"op_type":{"string":"I"},"op_ts":{"string":"1636083480000"},"current_ts":{"string":"1636058282236"},"row_id":{"string":"AAAFhxAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}
If you look at the redo log topic, you will also see
"Origin":{"string":"Oracle Database"}
being added, and this is probably not
what you want:
kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":611987},"START_SCN":{"long":611987},"COMMIT_SCN":{"long":611988},"TIMESTAMP":{"long":1636083480000},"START_TIMESTAMP":{"long":1636083480000},"COMMIT_TIMESTAMP":{"long":1636083480000},"XIDUSN":{"long":3},"XIDSLT":{"long":33},"XIDSQN":{"long":407},"XID":{"bytes":"\u0003\u0000!\u0000\u0001\u0000\u0000"},"PXIDUSN":{"long":3},"PXIDSLT":{"long":33},"PXIDSQN":{"long":407},"PXID":{"bytes":"\u0003\u0000!\u0000\u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhxAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":55},"RBABLK":{"long":13},"RBABYTE":{"long":208},"UBAFIL":{"long":3},"UBABLK":{"long":169},"UBAREC":{"long":19},"UBASQN":{"long":74},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22641},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22641},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 13:38:00.541'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 13:38:00.541') and ROWID = 'AAAFhxAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x000037.0000000d.00d0 "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":22},"UNDO_VALUE":{"long":23},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":611988},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false},"Origin":{"string":"Oracle Database"}}
You can leverage KIP-585: Filter and Conditional SMTs to apply SMTs to certain topics only.
"transforms": "InsertField",
"transforms.InsertField.predicate":"has-my-prefix",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database",
"predicates":"has-my-prefix",
"predicates.has-my-prefix.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.has-my-prefix.pattern":"ORCL.ADMIN.*"
After the connector is up and running, if changes happen to tables of your
interest (for example, INSERT
, UPDATE
, or DELETE
) , you will see
"Origin":{"string":"Oracle Database"}
being added to a table-specific topic:
kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636060255772},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"617372"},"op_type":{"string":"I"},"op_ts":{"string":"1636085455000"},"current_ts":{"string":"1636060258267"},"row_id":{"string":"AAAFhzAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}
It is important to note that "Origin":{"string":"Oracle Database"}
won’t
show up in the redo log topic:
kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":617372},"START_SCN":{"long":617371},"COMMIT_SCN":{"long":617373},"TIMESTAMP":{"long":1636085455000},"START_TIMESTAMP":{"long":1636085455000},"COMMIT_TIMESTAMP":{"long":1636085455000},"XIDUSN":{"long":9},"XIDSLT":{"long":31},"XIDSQN":{"long":416},"XID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"PXIDUSN":{"long":9},"PXIDSLT":{"long":31},"PXIDSQN":{"long":416},"PXID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhzAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":61},"RBABLK":{"long":231},"RBABYTE":{"long":140},"UBAFIL":{"long":3},"UBABLK":{"long":2952},"UBAREC":{"long":12},"UBASQN":{"long":70},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22643},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22643},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 14:10:55.772'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 14:10:55.772') and ROWID = 'AAAFhzAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x00003d.000000e7.008c "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":16},"UNDO_VALUE":{"long":17},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":617373},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false}}