Testing Scenarios and Examples for Oracle CDC Source Connector for Confluent Platform¶
The following sections provide testing scenarios and examples for running the Kafka Connect Oracle CDC Source connector.
Testing scenarios¶
After installing the connector, you can complete the following testing scenarios before running the connector in a production environment.
Capturing Redo logs only¶
If not running, start Confluent Platform.
confluent local services start
Create the following connector configuration JSON file and save the file as
config1.json
.Note the following configuration property entries:
- Configure the connector with a new
name
. - Set
table.topic.name.template
to an empty string. - Set
table.inclusion.regex
to capture several tables. - (Optional) Use
redo.log.topic.name
to rename the redo log. - (Optional) Set
redo.log.corruption.topic
to specify the topic where you want to record corrupted records.
{ "name": "SimpleOracleCDC_1", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_1", "tasks.max":1, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "oracle.server": "<database-url>", "oracle.port": 1521, "oracle.sid":"<SID of the CDB>", "oracle.pdb.name":"<name of the PDB where tables reside>", "oracle.username": "<username e.g. C##MYUSER>", "oracle.password": "<password>", "start.from":"snapshot", "redo.log.topic.name": "redo-log-topic-1", "table.inclusion.regex":"<regex-expression>", "_table.topic.name.template_":"Set to an empty string to disable generating change event records", "table.topic.name.template": "", "connection.pool.max.size": 20, "confluent.topic.replication.factor":1, "topic.creation.groups": "redo", "topic.creation.redo.include": "redo-log-topic", "topic.creation.redo.replication.factor": 3, "topic.creation.redo.partitions": 1, "topic.creation.redo.cleanup.policy": "delete", "topic.creation.redo.retention.ms": 1209600000, "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 5, "topic.creation.default.cleanup.policy": "compact" } }
- Configure the connector with a new
Enter the following command to start the connector:
curl -s -X POST -H 'Content-Type: application/json' --data @config1.json http://localhost:8083/connectors | jq
Enter the following command to get the connector status:
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_1/status | jq
Verify the following connector operations are successful:
- The connector is started with one running task (see the following note).
- The connector produces records whenever DML events (
INSERT
,UPDATE
, ANDDELETE
) occur for captured tables. - The connector does not produce records
for tables that were not included in regex or were explicitly excluded with
table.exclusion.regex
. - If the
redo.log.corruption.topic
is configured, the connector sends corrupted records to the specified corruption topic.
Note
If using the property
"start.from":"snapshot"
, the redo log topic contains only database operations completed after the connector starts.Enter the following command to check Kafka topics:
kafka-topics --list --zookeeper localhost:2181
If there are operations on the tables after the connector starts, you should see the topic configured by the
redo-log-topic
property. If no operations have occurred, there should be nothing displayed other than internal topics.Consume records using the Avro console consumer.
kafka-avro-console-consumer --topic redo-log-topic-1 \ --partition 0 --offset earliest --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081
If there are operations on the tables after the connector starts, you should see records displayed. If no operations have occurred, there should be no records.
Check for errors in the log:
confluent local services connect log | grep "ERROR"
After you finish testing, enter the following command to clean up the running configuration:
confluent local services destroy
Capturing Redo logs and Change Event logs¶
If not running, start Confluent Platform.
confluent local services start
Create the following connector configuration JSON file and save the file as
config2.json
.{ "name": "SimpleOracleCDC_2", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_2", "tasks.max":3, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "oracle.server": "<database-url>", "oracle.port": 1521, "oracle.sid":"<SID of the CDB>", "oracle.pdb.name":"<name of the PDB where tables reside>", "oracle.username": "<username e.g C##MYUSER>", "oracle.password": "<password>", "start.from":"snapshot", "redo.log.topic.name": "redo-log-topic-2", "redo.log.consumer.bootstrap.servers":"localhost:9092", "table.inclusion.regex":"<regex-expression>", "_table.topic.name.template_":"Using template vars to set change event topic for each table", "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}", "connection.pool.max.size": 20, "confluent.topic.replication.factor":1, "topic.creation.groups": "redo", "topic.creation.redo.include": "redo-log-topic-2", "topic.creation.redo.replication.factor": 3, "topic.creation.redo.partitions": 1, "topic.creation.redo.cleanup.policy": "delete", "topic.creation.redo.retention.ms": 1209600000, "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 5, "topic.creation.default.cleanup.policy": "compact" } }
Create
redo-log-topic-2
. Make sure the topic name matches the value you put for"redo.log.topic.name"
.bin/kafka-topics --create --topic redo-log-topic-2 \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Enter the following command to start the connector:
curl -s -X POST -H 'Content-Type: application/json' --data @config2.json http://localhost:8083/connectors | jq
Enter the following command to get the connector status:
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_2/status | jq
Verify the connector starts with three running tasks.
Perform
INSERT
,UPDATE
, andDELETE
row operations for each table and verify the following expected results:- The connector creates a redo log topic and change event log topics for each captured table.
- Redo log events are generated starting from current time.
- The change event log for each table contains snapshot events
(
op_type=R
) followed by other types of events.
Enter the following command to check Kafka topics:
kafka-topics --list --zookeeper localhost:2181
You should see the topic configured by the
redo-log-topic
property and topics in the form of${databaseName}.${schemaName}.${tableName}
.Consume records using the Avro console consumer.
kafka-avro-console-consumer --topic redo-log-topic-2 \ --partition 0 --offset earliest --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081
You should see change event records with
op_type=I
(insert),op_type=I
(update), orop_type=D
(delete).Check for errors in the log.
confluent local services connect log | grep "ERROR"
After you finish testing, enter the following command to clean up the running configuration:
confluent local services destroy
Starting from a specific SCN (without snapshot)¶
If not running, start Confluent Platform.
confluent local services start
Create the following connector configuration JSON file. Save the JSON file using the name
config3.json
.You have to choose an Oracle System Change Number (SCN) that exists with (at minimum) a redo log with the SCN or timestamp. The log has to be applicable for one of the included tables. You can use
SELECT CURRENT_SCN FROM v$database;
to query the current SCN of the database.{ "name": "SimpleOracleCDC_3", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_3", "tasks.max":3, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "oracle.server": "<database-url>", "oracle.port": 1521, "oracle.sid":"<SID of the CDB>", "oracle.pdb.name":"<name of the PDB where tables reside>", "oracle.username": "<username e.g. C##MYUSER>", "oracle.password": "<password>", "_start.from_":"Set to a proper scn or timestamp to start without snapshotting tables", "start.from":"<SCN>", "redo.log.topic.name": "redo-log-topic-3", "redo.log.consumer.bootstrap.servers":"localhost:9092", "table.inclusion.regex":"<regex-expression>", "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}", "connection.pool.max.size": 20, "confluent.topic.replication.factor":1, "topic.creation.groups": "redo", "topic.creation.redo.include": "redo-log-topic-3", "topic.creation.redo.replication.factor": 3, "topic.creation.redo.partitions": 1, "topic.creation.redo.cleanup.policy": "delete", "topic.creation.redo.retention.ms": 1209600000, "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 5, "topic.creation.default.cleanup.policy": "compact" } }
Create
redo-log-topic-3
. Make sure the topic name matches the value you put for"redo.log.topic.name"
.bin/kafka-topics --create --topic redo-log-topic-3 \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Enter the following command to start the connector:
curl -s -X POST -H 'Content-Type: application/json' --data @config3.json http://localhost:8083/connectors | jq
Enter the following command to get the connector status:
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_3/status | jq
Verify that connector is started with three running tasks.
Perform
INSERT
,``UPDATE``
, andDELETE
row operations for each table and verify the following expected results:- The connector creates a redo log topic and change event log topics for each captured table.
- Redo log events are generated starting from current time.
- The change event log for each table stars from the specified
start.from
values and do not contain snapshot events (op_type=R
).
Enter the following command to check Kafka topics:
kafka-topics --list --zookeeper localhost:2181
You should see the topic configured by the
redo-log-topic
property and topics in the form of${databaseName}.${schemaName}.${tableName}
.Consume records using the Avro console consumer.
kafka-avro-console-consumer --topic redo-log-topic-3 \ --partition 0 --offset earliest --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081
You should see table-specific topics with snapshot records (
op_type=R
).Check for errors in the log.
confluent local services connect log | grep "ERROR"
After you finish testing, enter the following command to clean up the running configuration:
confluent local services destroy
Capturing LOB type columns¶
The following example shows how to capture LOB type columns.
Prerequisites¶
- A table that contains columns stored as LOB (BLOB, CLOB, or NCLOB) data type.
- The table must have a primary key.
Procedure¶
If not running, start Confluent Platform.
confluent local services start
Create the following connector configuration JSON file and save it as
config4.json
.You have to choose an Oracle System Change Number (SCN) that exists with (at minimum) a redo log with the SCN or timestamp. The log has to be applicable for one of the included tables. You can use
SELECT CURRENT_SCN FROM v$database;
to query the current SCN of the database.{ "name": "SimpleOracleCDC_4", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_4", "tasks.max":2, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "oracle.server": "<database-url>", "oracle.port": 1521, "oracle.sid":"<SID of the CDB>", "oracle.pdb.name":"<name of the PDB where tables reside>", "oracle.username": "<username e.g. C##MYUSER>", "oracle.password": "<password>", "start.from":"snapshot", "redo.log.topic.name": "redo-log-topic-4", "redo.log.consumer.bootstrap.servers":"localhost:9092", "table.inclusion.regex":"<regex-expression>", "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}", "_lob.topic.name.template_": "Using template vars to set lob topic for each table", "lob.topic.name.template": "${tableName}.${columnName}_topic", "connection.pool.max.size": 20, "confluent.topic.replication.factor":1, "topic.creation.groups": "redo", "topic.creation.redo.include": "redo-log-topic-4", "topic.creation.redo.replication.factor": 3, "topic.creation.redo.partitions": 1, "topic.creation.redo.cleanup.policy": "delete", "topic.creation.redo.retention.ms": 1209600000, "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 5, "topic.creation.default.cleanup.policy": "compact" } }
Create
redo-log-topic-4
. Make sure the topic name matches the value you put for"redo.log.topic.name"
.bin/kafka-topics --create --topic redo-log-topic-4 \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Enter the following command to start the connector:
curl -s -X POST -H 'Content-Type: application/json' --data @config4.json http://localhost:8083/connectors | jq
Enter the following command and verify that connector is started with two running tasks.
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_4/status | jq
Perform
INSERT
,UPDATE
, andDELETE
row operations for each table and verify the following expected results:The redo log topic is created.
Change event topics are created for each captured table.
LOB topics are created for each LOB column.
The key of the LOB topic records contain the following information:
{ "table", "dot.separated.fully.qualified.table.name", "column", "column.name.of.LOB.column", "primary_key", "primary.key.of.change.event.topic.after.applying.${key.template}" }
The value of the LOB topic is the LOB value.
When a row is deleted from the table, the corresponding LOB is deleted from the LOB topic. The connector writes a tombstone record (null value) to the LOB topic.
After you finish testing, enter the following command to clean up the running configuration:
confluent local services destroy
Using Kerberos authentication¶
Use the following example to set up the connector to use Kerberos version 5 (v5). The example uses the redo logs to show how the configuration is set up.
Note
- This connector only supports authentication with Kerberos v5.
- The previous examples in this section can also be used to check out Kerberos v5 functionality. Ensure you verify the following prerequisites and add the additional Kerberos property to the connector configuration.
Prerequisites¶
- You must enable Kerberos on your Oracle Database instance. For instructions, see Configuring Kerberos Authentication. If your Oracle Database conforms to multi-tenant architecture (that is, has a container and pluggable databases), ensure the connector user account created is a common user and you set the appropriate user privileges for the user.
- A Kerberos Key Distribution Center (KDC) must be configured within the domain.
- The KDC must be accessible from the Connect workers. To do this, ensure the
krb5.conf
file in each Connect worker correctly maps your realm to the KDC. - A Connect Kerberos user principle in Oracle DB in the KDC will be copied from the corresponding keytab file to each connect worker.
Procedure¶
In your Connect worker, run the following command:
kinit -kt /path/to/the/keytab --renewable -f <your-connect-user-principle>
The flags
--renewable
and-f
are required when using kinit, since a long running connector has to renew the ticket-granting ticket (TGT) and the tickets must be forwardable (-f
).Tip
You can add the flag
-c /path/to/credentials/cache/file
to save your credentials cache file in a preferred location. The credentials cache file is the only requirement for authenticating the connector to Oracle using Kerberos.If not running, start Confluent Platform.
confluent local services start
Create the following connector configuration JSON file and save it as
config5.json
.Note
The Oracle CDC Source connector uses the
oracle.kerberos.cache.file
configuration property to specify the path to the cache file generated previously.{ "name": "SimpleOracleCDC_5", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_5", "tasks.max":1, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "oracle.server": "", "oracle.port": 1521, "oracle.sid":"", "oracle.pdb.name":"", "oracle.username": "<C##MYUSER>", "oracle.password": "", "start.from":"snapshot", "redo.log.topic.name": "redo-log-topic-5", "table.inclusion.regex":"", "table.topic.name.template":"<Set to an empty string to disable generating change event records>", "table.topic.name.template": "", "connection.pool.max.size": 20, "confluent.topic.replication.factor":1, "topic.creation.groups": "redo", "topic.creation.redo.include": "redo-log-topic-5", "topic.creation.redo.replication.factor": 3, "topic.creation.redo.partitions": 1, "topic.creation.redo.cleanup.policy": "delete", "topic.creation.redo.retention.ms": 1209600000, "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 5, "topic.creation.default.cleanup.policy": "compact", "oracle.kerberos.cache.file": "</path/to/my/kerberos/credentials-file>" } }
Create
redo-log-topic-5
. Make sure the topic name matches the value you put for"redo.log.topic.name"
.bin/kafka-topics --create --topic redo-log-topic-5 \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Enter the following command to start the connector:
curl -s -X POST -H 'Content-Type: application/json' --data @config5.json http://localhost:8083/connectors | jq
Enter the following command and verify that connector is started with two running tasks.
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_5/status | jq
Perform
INSERT
,UPDATE
, andDELETE
row operations for each and verify following lists expected results:- The connector starts and has one running task.
- Change event topics are created for each captured table.
- The connector does not produce records for tables that were not included in
regex or were explicitly excluded using the
table.exclusion.regex
property. - If the
redo.log.corruption.topic
property was configured, the connector sends corrupted records to the specified corruption topic.
Using SSL for encrypted communication and server/client authentication¶
Use the following example to set up the connector to use SSL one and two-way (mutual) authentication. The example uses the redo logs to show how the configuration is set up.
Prerequisites¶
- Oracle database server with SSL configured. Refer to Oracle documentation such as https://docs.oracle.com/database/121/DBSEG/asossl.htm#DBSEG070 or https://www.oracle.com/technetwork/database/enterprise-edition/wp-oracle-jdbc-thin-ssl-130128.pdf for configuration details.
- For server authentication, a local truststore with an imported trusted server certificate.
- For client or mutual authentication, a local keystore containing client keys (trusted by the server).
Procedure¶
If not running, start Confluent Platform.
confluent local services start
Create the following connector configuration JSON file and save it as
config6.json
.Note
Note the following properties (shown in the example):
- The Oracle CDC Source connector uses the
oracle.ssl.truststore.file
andoracle.ssl.truststore.password
properties to specify the location of the truststore containing the trusted server certificate and the truststore password. - The passthrough properties
oracle.connection.javax.net.ssl.keyStore
andoracle.connection.javax.net.ssl.keyStorePassword
are also used to supply the keystore location and password.
{ "name": "SimpleOracleCDC_6", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_6", "tasks.max":1, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "oracle.server": "", "oracle.port": 1521, "oracle.sid":"", "oracle.pdb.name":"", "oracle.username": "<C##MYUSER>", "oracle.password": "", "start.from":"snapshot", "redo.log.topic.name": "redo-log-topic-6", "table.inclusion.regex":"", "table.topic.name.template":"<Set to an empty string to disable generating change event records>", "table.topic.name.template": "", "connection.pool.max.size": 20, "confluent.topic.replication.factor":1, "topic.creation.groups": "redo", "topic.creation.redo.include": "redo-log-topic-6", "topic.creation.redo.replication.factor": 3, "topic.creation.redo.partitions": 1, "topic.creation.redo.cleanup.policy": "delete", "topic.creation.redo.retention.ms": 1209600000, "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 5, "topic.creation.default.cleanup.policy": "compact", "oracle.ssl.truststore.file": "</path/to/truststore/file/containing/server/certs>", "oracle.ssl.truststore.password": "<password>", "oracle.connection.javax.net.ssl.keyStore": "</path/to/keystore/file>", "oracle.connection.javax.net.ssl.keyStorePassword": "<password>" } }
- The Oracle CDC Source connector uses the
Create
redo-log-topic-6
. Make sure the topic name matches the value you put for"redo.log.topic.name"
.bin/kafka-topics --create --topic redo-log-topic-6 \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Enter the following command to start the connector:
curl -s -X POST -H 'Content-Type: application/json' --data @config5.json http://localhost:8083/connectors | jq
Enter the following command and verify that connector is started with two running tasks.
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_6/status | jq
Perform
INSERT
,UPDATE
, andDELETE
row operations for each and verify following lists expected results:- The connector starts and has one running task.
- Change event topics are created for each captured table.
- The connector does not produce records for tables that were not included in
regex or were explicitly excluded using the
table.exclusion.regex
property. - If the
redo.log.corruption.topic
property was configured, the connector sends corrupted records to the specified corruption topic.
Connecting to an Oracle RAC cluster using service name¶
The following example demonstrates how to configure the connector to connect to an Oracle RAC cluster using the service name instead of SID.
Prerequisites¶
- Oracle RAC cluster configured with a service name and multiple database instances
Procedure¶
If not running, start Confluent Platform.
confluent local services start
Create the following connector configuration JSON file and save it as
config7.json
.Note
oracle.service.name
property specifies the service name to use when connecting to RAC- An
oracle.sid
is still required. It can be the SID of any of the database instances.
{ "name": "SimpleOracleCDC_7", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_7", "tasks.max":1, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "oracle.server": "", "oracle.port": 1521, "oracle.sid":"", "oracle.service.name":"", "oracle.pdb.name":"", "oracle.username": "<C##MYUSER>", "oracle.password": "", "start.from":"snapshot", "redo.log.topic.name": "redo-log-topic-7", "table.inclusion.regex":"", "table.topic.name.template":"<Set to an empty string to disable generating change event records>", "table.topic.name.template": "", "connection.pool.max.size": 20, "confluent.topic.replication.factor":1, "topic.creation.groups": "redo", "topic.creation.redo.include": "redo-log-topic-6", "topic.creation.redo.replication.factor": 3, "topic.creation.redo.partitions": 1, "topic.creation.redo.cleanup.policy": "delete", "topic.creation.redo.retention.ms": 1209600000, "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 5, "topic.creation.default.cleanup.policy": "compact" } }
Create
redo-log-topic-7
. Make sure the topic name matches the value you put for"redo.log.topic.name"
.bin/kafka-topics --create --topic redo-log-topic-7 \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Enter the following command to start the connector:
curl -s -X POST -H 'Content-Type: application/json' --data @config5.json http://localhost:8083/connectors | jq
Enter the following command and verify that connector is started with two running tasks.
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_7/status | jq
Perform
INSERT
,UPDATE
, andDELETE
row operations for each and verify following lists expected results:- The connector starts and has one running task.
- Change event topics are created for each captured table.
- The connector does not produce records for tables that were not included in
regex or were explicitly excluded using the
table.exclusion.regex
property. - If the
redo.log.corruption.topic
property was configured, the connector sends corrupted records to the specified corruption topic.
Capturing Redo logs and Snapshot with Supplemental logging only¶
If not running, start Confluent Platform.
confluent local services start
Create the following connector configuration JSON file and save the file as
config1.json
.Note the following configuration property entries:
- Configure the connector with a new
name
. - Set
table.inclusion.regex
to capture several tables. - (Optional) Set
redo.log.corruption.topic
to specify the topic where you want to record corrupted records.
{ "name": "SimpleOracleCDC_8", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_8", "tasks.max": 3, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "oracle.server": "<database-url>", "oracle.port": 1521, "oracle.sid": "<SID of the CDB>", "oracle.pdb.name": "<name of the PDB where tables reside>", "oracle.username": "<username e.g C##MYUSER>", "oracle.password": "<password>", "oracle.supplemental.log.level": "msl", "start.from": "snapshot", "redo.log.topic.name": "redo-log-topic-2", "redo.log.consumer.bootstrap.servers": "localhost:9092", "table.inclusion.regex": "<regex-expression>", "_table.topic.name.template_": "Using template vars to set change event topic for each table", "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}", "connection.pool.max.size": 20, "confluent.topic.replication.factor": 1, "topic.creation.groups": "redo", "topic.creation.redo.include": "redo-log-topic-8", "topic.creation.redo.replication.factor": 3, "topic.creation.redo.partitions": 1, "topic.creation.redo.cleanup.policy": "delete", "topic.creation.redo.retention.ms": 1209600000, "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 5, "topic.creation.default.cleanup.policy": "compact" } }
- Configure the connector with a new
Enter the following command to start the connector:
curl -s -X POST -H 'Content-Type: application/json' --data @config1.json http://localhost:8083/connectors | jq
Enter the following command to get the connector status:
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_8/status | jq
Verify the following connector operations are successful:
- The connector is started with three running tasks (see the following note).
- The connector produces snapshot records for captured tables.
Note
If using the property
"start.from":"snapshot"
, the redo log topic contains only database operations completed after the connector starts.Enter the following command to check Kafka topics:
kafka-topics --list --zookeeper localhost:2181
You should see the topic configured by the
redo-log-topic
property and topics in the form of${databaseName}.${schemaName}.${tableName}
.Consume records using the Avro console consumer.
kafka-avro-console-consumer --topic redo-log-topic-8 \ --partition 0 --offset earliest --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081
If there are operations on the tables after the connector starts, you should see records displayed. If no operations have occurred, there should be no records.
Check for errors in the log:
confluent local services connect log | grep "ERROR"
After you finish testing, enter the following command to clean up the running configuration:
confluent local services destroy
Configuration examples for running against Confluent Cloud¶
The following configuration examples allow you to run the connector against Confluent Cloud.
Note
The configuration examples are based on a connector running locally using Confluent Platform version 6.0 (and later).
Using the Avro converter with Schema Registry¶
The following worker and connector configuration examples are for running the
Oracle CDC Source connector against Confluent Cloud with Confluent Cloud Schema Registry and the Avro
(io.confluent.connect.avro.AvroConverter
) value converter.
Distributed worker configuration¶
Create your
my-connect-distributed.properties
file based on the following example.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092> key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>"; request.timeout.ms=20000 retry.backoff.ms=500 producer.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092> producer.ssl.endpoint.identification.algorithm=https producer.security.protocol=SASL_SSL producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>"; producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 consumer.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092> consumer.ssl.endpoint.identification.algorithm=https consumer.security.protocol=SASL_SSL consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>"; consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 offset.flush.interval.ms=10000 offset.storage.file.filename=/tmp/connect.offsets group.id=connect-cluster offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 # Schema Registry specific settings # We recommend you use Confluent Cloud Schema Registry if you run Oracle CDC Source against Confluent Cloud value.converter.basic.auth.credentials.source=USER_INFO value.converter.schema.registry.basic.auth.user.info=<schema-registry-api-key>:<schema-registry-api-secret> value.converter.schema.registry.url=<your-schema-registry-url e.g https://xyz.us-east-2.aws.confluent.cloud> # Confluent license settings confluent.topic.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092> confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>"; confluent.topic.security.protocol=SASL_SSL confluent.topic.sasl.mechanism=PLAIN # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,<path-to>/confluent-6.0.0/share/confluent-hub-components
Start Kafka Connect with the following command:
<path-to-confluent-home>/bin/connect-distributed my-connect-distributed.properties
Connector configuration¶
Create your
oracle-cdc-confluent-cloud.json
file based on the following example:{ "name": "OracleCDC_Confluent_Cloud", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "OracleCDC_Confluent_Cloud", "tasks.max":3, "oracle.server": "<database-url>", "oracle.sid":"<SID of the CDB>", "oracle.pdb.name":"<name of the PDB where tables reside. If you don't have PDB, remove this config property>", "oracle.username": "<username e.g. C##MYUSER>", "oracle.password": "<password>", "start.from":"snapshot", "redo.log.topic.name": "oracle-redo-log-topic", "redo.log.consumer.bootstrap.servers":"<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>", "redo.log.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-api-key>\" password=\"<kafka-api-secret>\";", "redo.log.consumer.security.protocol":"SASL_SSL", "redo.log.consumer.sasl.mechanism":"PLAIN", "table.inclusion.regex":"<regex-expression e.g. ORCL[.]ADMIN[.]MARIPOSA.*>", "_table.topic.name.template_":"Using template vars to set change event topic for each table", "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}", "connection.pool.max.size": 20, "confluent.topic.replication.factor":3, "topic.creation.groups":"redo", "topic.creation.redo.include":"oracle-redo-log-topic", "topic.creation.redo.replication.factor":3, "topic.creation.redo.partitions":1, "topic.creation.redo.cleanup.policy":"delete", "topic.creation.redo.retention.ms":1209600000, "topic.creation.default.replication.factor":3, "topic.creation.default.partitions":5, "topic.creation.default.cleanup.policy":"compact", "confluent.topic.bootstrap.servers":"<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>", "confluent.topic.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-api-key>\" password=\"<kafka-api-secret>\";", "confluent.topic.security.protocol":"SASL_SSL", "confluent.topic.sasl.mechanism":"PLAIN", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source":"USER_INFO", "value.converter.schema.registry.basic.auth.user.info":"<schema-registry-api-key>:<schema-registry-api-secret>", "value.converter.schema.registry.url":"<your-schema-registry-url e.g https://xyz.us-east-2.aws.confluent.cloud>" } }
Create
oracle-redo-log-topic
. Make sure the topic name matches the value you put for"redo.log.topic.name"
.Confluent Platform CLI
bin/kafka-topics --create --topic oracle-redo-log-topic \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Confluent Cloud CLI
confluent kafka topic create oracle-redo-log-topic \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Start the Oracle CDC Source connector with the following command:
curl -s -H "Content-Type: application/json" -X POST -d @oracle-cdc-confluent-cloud.json http://localhost:8083/connectors/ | jq
Using the JSON (schemaless) converter¶
The following worker and connector configuration examples are for running the
Oracle CDC Source connector against Confluent Cloud with the (schemaless) JSON
(org.apache.kafka.connect.json.JsonConverter
) value converter.
Distributed worker configuration¶
Create your
my-connect-distributed-json.properties
file based on the following example.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092> key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>"; request.timeout.ms=20000 retry.backoff.ms=500 producer.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092> producer.ssl.endpoint.identification.algorithm=https producer.security.protocol=SASL_SSL producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>"; producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 consumer.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092> consumer.ssl.endpoint.identification.algorithm=https consumer.security.protocol=SASL_SSL consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>"; consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 offset.flush.interval.ms=10000 offset.storage.file.filename=/tmp/connect.offsets group.id=connect-cluster offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 # Confluent license settings confluent.topic.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092> confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>"; confluent.topic.security.protocol=SASL_SSL confluent.topic.sasl.mechanism=PLAIN # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,<path-to>/confluent-6.0.0/share/confluent-hub-components # Enable source connectors to create topics # KIP-158 topic.creation.enable=true
Start Kafka Connect with the following command:
<path-to-confluent-home>/bin/connect-distributed my-connect-distributed-json.properties
Connector configuration¶
Create your
oracle-cdc-confluent-cloud-json.json
file based on the following example:{ "name": "OracleCDC_Confluent_Cloud", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "OracleCDC_Confluent_Cloud", "tasks.max":3, "oracle.server": "<database-url>", "oracle.sid":"<SID of the CDB>", "oracle.pdb.name":"<name of the PDB where tables reside. If you don't have PDB, remove this config property>", "oracle.username": "<username e.g. C##MYUSER>", "oracle.password": "<password>", "start.from":"snapshot", "redo.log.topic.name": "oracle-redo-log-topic", "redo.log.consumer.bootstrap.servers":"<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>", "redo.log.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-api-key>\" password=\"<kafka-api-secret>\";", "redo.log.consumer.security.protocol":"SASL_SSL", "redo.log.consumer.sasl.mechanism":"PLAIN", "table.inclusion.regex":"<regex-expression e.g. ORCL[.]ADMIN[.]MARIPOSA.*>", "_table.topic.name.template_":"Using template vars to set change event topic for each table", "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}", "connection.pool.max.size": 20, "confluent.topic.replication.factor":3, "topic.creation.groups":"redo", "topic.creation.redo.include":"oracle-redo-log-topic", "topic.creation.redo.replication.factor":3, "topic.creation.redo.partitions":1, "topic.creation.redo.cleanup.policy":"delete", "topic.creation.redo.retention.ms":1209600000, "topic.creation.default.replication.factor":3, "topic.creation.default.partitions":5, "topic.creation.default.cleanup.policy":"compact", "confluent.topic.bootstrap.servers":"<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>", "confluent.topic.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-api-key>\" password=\"<kafka-api-secret>\";", "confluent.topic.security.protocol":"SASL_SSL", "confluent.topic.sasl.mechanism":"PLAIN", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } }
Create
oracle-redo-log-topic
. Make sure the topic name matches the value you put for"redo.log.topic.name"
.Confluent Platform CLI
bin/kafka-topics --create --topic oracle-redo-log-topic \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Confluent Cloud CLI
confluent kafka topic create oracle-redo-log-topic \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000
Start the Oracle CDC Source connector using the following command:
curl -s -H "Content-Type: application/json" -X POST -d @oracle-cdc-confluent-cloud-json.json http://localhost:8083/connectors/ | jq
Oracle CDC Source Connector output examples¶
The following are examples of the output the Oracle CDC Source Connector creates.
CUSTOMERS table is created by Admin on Amazon RDS for Oracle
CREATE TABLE CUSTOMERS (
ID NUMBER(10) NOT NULL PRIMARY KEY,
FIRST_NAME VARCHAR(50),
CLUB_STATUS VARCHAR(8),
COMMENTS VARCHAR(90),
CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Connector is configured with AVRO schema
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
Table-specific topic: Schemas and records created by the Oracle CDC Connector with AvroConverter¶
Schema for Kafka Record Key
The schema depends on this key.template.
A “long” type is used in this example because the primary key on the table is id
NUMBER(10) NOT NULL PRIMARY KEY
:
long
Schema for Kafka Record Value
{
"fields": [
{
"name": "ID",
"type": "long"
},
{
"default": null,
"name": "FIRST_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "CLUB_STATUS",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "COMMENTS",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "CREATE_TS",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
]
},
{
"default": null,
"name": "table",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "scn",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "op_type",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "op_ts",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "current_ts",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "row_id",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "username",
"type": [
"null",
"string"
]
}
],
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"type": "record"
}
Kafka record key for a snapshot
1
Kafka record value for a snapshot
{
"ID": 1,
"FIRST_NAME": {
"string": "Rica"
},
"CLUB_STATUS": {
"string": "bronze"
},
"COMMENTS": {
"string": "Universal optimal hierarchy"
},
"CREATE_TS": {
"long": 1634834346041
},
"table": {
"string": "ORCL.ADMIN.CUSTOMERS"
},
"scn": {
"string": "582847"
},
"op_type": {
"string": "R"
},
"op_ts": null,
"current_ts": {
"string": "1634834582666"
},
"row_id": null,
"username": null
}
Kafka record key for a new insert after Oracle CDC Connector is running
5
Kafka record value for a new insert after Oracle CDC Connector is running
{
"ID": 5,
"FIRST_NAME": {
"string": "Hansiain"
},
"CLUB_STATUS": {
"string": "platinum"
},
"COMMENTS": {
"string": "Centralized full-range approach"
},
"CREATE_TS": {
"long": 1634834933061
},
"table": {
"string": "ORCL.ADMIN.CUSTOMERS"
},
"scn": {
"string": "584627"
},
"op_type": {
"string": "I"
},
"op_ts": {
"string": "1634860133000"
},
"current_ts": {
"string": "1634834938235"
},
"row_id": {
"string": "AAAFhYAAAAAAAMcAAE"
},
"username": {
"string": "ADMIN"
}
}
Use the configuration parameter output.before.state.field
to have the connector include the before state in the change event records for update operations. In the
following example, the value specified for this configuration parameter is before
.
There are no changes to the change event records for snapshot, insert and delete operations when using this feature.
Schema for kafka record value
{
"fields": [
{
"name": "ID",
"type": "long"
},
{
"default": null,
"name": "FIRST_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "CLUB_STATUS",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "COMMENTS",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "CREATE_TS",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
]
},
{
"default": null,
"name": "table",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "scn",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "op_type",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "op_ts",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "current_ts",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "row_id",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "username",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "before",
"type": [
"null",
{
"fields": [
{
"name": "ID",
"type": "long"
},
{
"default": null,
"name": "FIRST_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "CLUB_STATUS",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "COMMENTS",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "CREATE_TS",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
]
}
],
"name": "ConnectDefault2",
"type": "record"
}
]
}
],
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"type": "record"
}
Kafka record key for an update after Oracle CDC Connector is running
5
Kafka record value for an update after Oracle CDC Connector is running
{
"ID": 5,
"FIRST_NAME": {
"string": "Hansiain"
},
"CLUB_STATUS": {
"string": "platinum"
},
"COMMENTS": {
"string": "Centralized full-range approach"
},
"CREATE_TS": {
"long": 1634834933061
},
"table": {
"string": "ORCLCDB.C##MYUSER.CUSTOMERS"
},
"scn": {
"string": "2162621"
},
"op_type": {
"string": "U"
},
"op_ts": {
"string": "1715765856000"
},
"current_ts": {
"string": "1715765857516"
},
"row_id": {
"string": "AAAR34AAHAAAAFbAAG"
},
"username": {
"string": "SYS"
},
"before": {
"io.confluent.connect.avro.ConnectDefault2": {
"ID": 5,
"FIRST_NAME": {
"string": "Hansiain"
},
"CLUB_STATUS": {
"string": "gold"
},
"COMMENTS": {
"string": "Centralized full-range approach"
},
"CREATE_TS": {
"long": 1634834933061
}
}
}
}
Redo log topic: Schemas and records created by Oracle CDC Connector with AvroConverter¶
Schema for Kafka Record Key
[
"null",
"string"
]
Schema for Kafka Record Value
{
"fields": [
{
"default": null,
"name": "SCN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "START_SCN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "COMMIT_SCN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "TIMESTAMP",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
]
},
{
"default": null,
"name": "START_TIMESTAMP",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
]
},
{
"default": null,
"name": "COMMIT_TIMESTAMP",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
]
},
{
"default": null,
"name": "XIDUSN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "XIDSLT",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "XIDSQN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "XID",
"type": [
"null",
"bytes"
]
},
{
"default": null,
"name": "PXIDUSN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "PXIDSLT",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "PXIDSQN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "PXID",
"type": [
"null",
"bytes"
]
},
{
"default": null,
"name": "TX_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "OPERATION",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "OPERATION_CODE",
"type": [
"null",
{
"connect.type": "int8",
"type": "int"
}
]
},
{
"default": null,
"name": "ROLLBACK",
"type": [
"null",
"boolean"
]
},
{
"default": null,
"name": "SEG_OWNER",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "SEG_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "TABLE_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "SEG_TYPE",
"type": [
"null",
{
"connect.type": "int8",
"type": "int"
}
]
},
{
"default": null,
"name": "SEG_TYPE_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "TABLE_SPACE",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "ROW_ID",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "USERNAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "OS_USERNAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "MACHINE_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "AUDIT_SESSIONID",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SESSION_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SERIAL_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SESSION_INFO",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "THREAD_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SEQUENCE_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "RBASQN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "RBABLK",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "RBABYTE",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "UBAFIL",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "UBABLK",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "UBAREC",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "UBASQN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "ABS_FILE_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "REL_FILE_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "DATA_BLK_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "DATA_OBJ_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "DATA_OBJV_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "DATA_OBJD_NUM",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SQL_REDO",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "SQL_UNDO",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "RS_ID",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "SSN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "CSF",
"type": [
"null",
"boolean"
]
},
{
"default": null,
"name": "INFO",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "STATUS",
"type": [
"null",
"int"
]
},
{
"default": null,
"name": "REDO_VALUE",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "UNDO_VALUE",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SAFE_RESUME_SCN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "CSCN",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "OBJECT_ID",
"type": [
"null",
"bytes"
]
},
{
"default": null,
"name": "EDITION_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "CLIENT_ID",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "SRC_CON_NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "SRC_CON_ID",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SRC_CON_UID",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SRC_CON_DBID",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "SRC_CON_GUID",
"type": [
"null",
"bytes"
]
},
{
"default": null,
"name": "CON_ID",
"type": [
"null",
"boolean"
]
}
],
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"type": "record"
}
Kafka record key for a new insert after Oracle CDC Connector is running
CUSTOMERS
Kafka record key for a new insert after Oracle CDC Connector is running
{
"SCN": {
"long": 584627
},
"START_SCN": {
"long": 584627
},
"COMMIT_SCN": {
"long": 584628
},
"TIMESTAMP": {
"long": 1634860133000
},
"START_TIMESTAMP": {
"long": 1634860133000
},
"COMMIT_TIMESTAMP": {
"long": 1634860133000
},
"XIDUSN": {
"long": 10
},
"XIDSLT": {
"long": 12
},
"XIDSQN": {
"long": 434
},
"XID": {
"bytes": "\n\u0000\f\u0000²\u0001\u0000\u0000"
},
"PXIDUSN": {
"long": 10
},
"PXIDSLT": {
"long": 12
},
"PXIDSQN": {
"long": 434
},
"PXID": {
"bytes": "\n\u0000\f\u0000²\u0001\u0000\u0000"
},
"TX_NAME": null,
"OPERATION": {
"string": "INSERT"
},
"OPERATION_CODE": {
"int": 1
},
"ROLLBACK": {
"boolean": false
},
"SEG_OWNER": {
"string": "ADMIN"
},
"SEG_NAME": {
"string": "CUSTOMERS"
},
"TABLE_NAME": {
"string": "CUSTOMERS"
},
"SEG_TYPE": {
"int": 2
},
"SEG_TYPE_NAME": {
"string": "TABLE"
},
"TABLE_SPACE": {
"string": "USERS"
},
"ROW_ID": {
"string": "AAAFhYAAAAAAAMcAAE"
},
"USERNAME": {
"string": "ADMIN"
},
"OS_USERNAME": {
"string": "UNKNOWN"
},
"MACHINE_NAME": {
"string": "UNKNOWN"
},
"AUDIT_SESSIONID": {
"long": 50049
},
"SESSION_NUM": {
"long": 1269
},
"SERIAL_NUM": {
"long": 35981
},
"SESSION_INFO": {
"string": "UNKNOWN"
},
"THREAD_NUM": {
"long": 1
},
"SEQUENCE_NUM": {
"long": 4
},
"RBASQN": {
"long": 18
},
"RBABLK": {
"long": 311
},
"RBABYTE": {
"long": 16
},
"UBAFIL": {
"long": 3
},
"UBABLK": {
"long": 19960
},
"UBAREC": {
"long": 8
},
"UBASQN": {
"long": 89
},
"ABS_FILE_NUM": {
"long": 4
},
"REL_FILE_NUM": {
"long": 0
},
"DATA_BLK_NUM": {
"long": 796
},
"DATA_OBJ_NUM": {
"long": 22616
},
"DATA_OBJV_NUM": {
"long": 1
},
"DATA_OBJD_NUM": {
"long": 22616
},
"SQL_REDO": {
"string": "insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('5','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-10-21 09:48:53.061'));"
},
"SQL_UNDO": {
"string": "delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '5' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-10-21 09:48:53.061') and ROWID = 'AAAFhYAAAAAAAMcAAE';"
},
"RS_ID": {
"string": " 0x000012.00000137.0010 "
},
"SSN": {
"long": 4
},
"CSF": {
"boolean": false
},
"INFO": null,
"STATUS": {
"int": 0
},
"REDO_VALUE": {
"long": 3616
},
"UNDO_VALUE": {
"long": 3617
},
"SAFE_RESUME_SCN": {
"long": 0
},
"CSCN": {
"long": 584628
},
"OBJECT_ID": null,
"EDITION_NAME": null,
"CLIENT_ID": {
"string": "UNKNOWN"
},
"SRC_CON_NAME": {
"string": "ORCL"
},
"SRC_CON_ID": {
"long": 0
},
"SRC_CON_UID": {
"long": 0
},
"SRC_CON_DBID": {
"long": 0
},
"SRC_CON_GUID": null,
"CON_ID": {
"boolean": false
}
}
Numeric mapping examples¶
This section includes numeric.mapping
examples for the Oracle CDC connector.
Number (20,0)
Create a source table.
CREATE TABLE CUSTOMERS ( ID NUMBER(20,0) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(50), LAST_NAME VARCHAR(50), EMAIL VARCHAR(100), GENDER VARCHAR(50), CLUB_STATUS VARCHAR(8), COMMENTS VARCHAR(100), AMOUNT NUMBER(7,2), CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UPDATE_TS TIMESTAMP );
Run a connector with
"numeric.mapping": "none"
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":"\u0001","FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"bytes":"\u0004Ý"},"CREATE_TS":{"long":1644320281269},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"1960624"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644355646691"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": { "connect.name": "org.apache.kafka.connect.data.Decimal", "connect.parameters": { "scale": "0" }, "connect.version": 1, "logicalType": "decimal", "precision": 64, "scale": 0, "type": "bytes" } }
Run a connector with
"numeric.mapping": "best_fit_or_decimal"
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":"\u0001","FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"double":12.45},"CREATE_TS":{"long":1644320281269},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2017526"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644367436253"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": { "connect.name": "org.apache.kafka.connect.data.Decimal", "connect.parameters": { "scale": "0" }, "connect.version": 1, "logicalType": "decimal", "precision": 64, "scale": 0, "type": "bytes" } }
Run a connector with
"numeric.mapping": "best_fit_or_double"
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":1.0,"FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"double":12.45},"CREATE_TS":{"long":1644320281269},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2019644"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644367930477"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": "double" }
Number (20)
Create a source table.
CREATE TABLE CUSTOMERS ( ID NUMBER(20) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(50), LAST_NAME VARCHAR(50), EMAIL VARCHAR(100), GENDER VARCHAR(50), CLUB_STATUS VARCHAR(8), COMMENTS VARCHAR(100), AMOUNT NUMBER (7,2), CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UPDATE_TS TIMESTAMP );
Run the connector with
"numeric.mapping": "none"
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":"\u0001","FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"bytes":"\u0004Ý"},"CREATE_TS":{"long":1644339857874},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2025259"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644368882652"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": { "connect.name": "org.apache.kafka.connect.data.Decimal", "connect.parameters": { "scale": "0" }, "connect.version": 1, "logicalType": "decimal", "precision": 64, "scale": 0, "type": "bytes" } }
Run a connector with
"numeric.mapping": "best_fit_or_decimal"
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":"\u0001","FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"double":12.45},"CREATE_TS":{"long":1644339857874},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2027041"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644369288917"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": { "connect.name": "org.apache.kafka.connect.data.Decimal", "connect.parameters": { "scale": "0" }, "connect.version": 1, "logicalType": "decimal", "precision": 64, "scale": 0, "type": "bytes" } }
Run a connector with
"numeric.mapping": "best_fit_or_double"
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":1.0,"FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"double":12.45},"CREATE_TS":{"long":1644339857874},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2028468"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644369662892"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": "double" }
Number
Create a source table.
CREATE TABLE CUSTOMERS ( ID NUMBER NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(50), LAST_NAME VARCHAR(50), EMAIL VARCHAR(100), GENDER VARCHAR(50), CLUB_STATUS VARCHAR(8), COMMENTS VARCHAR(100), AMOUNT NUMBER (7,2), CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UPDATE_TS TIMESTAMP );
Run a connector with
"numeric.mapping": "none"
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning [2022-02-08 17:27:44,190] WARN Ignoring invalid logical type for name: decimal (org.apache.avro.LogicalTypes:120) {"ID":";\u0017Oe3&*@#34!1M.F \u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000","FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"bytes":"\u0004Ý"},"CREATE_TS":{"long":1644341059564},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2029983"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644370057266"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": { "connect.name": "org.apache.kafka.connect.data.Decimal", "connect.parameters": { "scale": "127" }, "connect.version": 1, "logicalType": "decimal", "precision": 64, "scale": 127, "type": "bytes" } }
Run a connector with
"numeric.mapping": "none"
and"numeric.default.scale":0
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":"\u0001","FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"bytes":"\u0004Ý"},"CREATE_TS":{"long":1644341059564},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2031001"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644370392927"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": { "connect.name": "org.apache.kafka.connect.data.Decimal", "connect.parameters": { "scale": "0" }, "connect.version": 1, "logicalType": "decimal", "precision": 64, "scale": 0, "type": "bytes" } }
Run a connector with
"numeric.mapping": "best_fit_or_decimal
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":1.0,"FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"double":12.45},"CREATE_TS":{"long":1644341059564},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2034916"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644370704171"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": "double" }
Run a connector with
"numeric.mapping": "best_fit_or_decimal"
and"numeric.default.scale":0
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":"\u0001","FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"double":12.45},"CREATE_TS":{"long":1644341059564},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2035815"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644371007827"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": { "connect.name": "org.apache.kafka.connect.data.Decimal", "connect.parameters": { "scale": "0" }, "connect.version": 1, "logicalType": "decimal", "precision": 64, "scale": 0, "type": "bytes" } }
Run a connector with
"numeric.mapping": "best_fit_or_double
.Record
(base) ➜ bin kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.MYUSER.CUSTOMERS --from-beginning {"ID":1.0,"FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"AMOUNT":{"double":12.45},"CREATE_TS":{"long":1644341059564},"UPDATE_TS":null,"table":{"string":"ORCL.MYUSER.CUSTOMERS"},"scn":{"string":"2038639"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1644371662104"},"row_id":null,"username":null}
Schema
{ "name": "ID", "type": "double" }
SMT Examples¶
This section includes Single Message Transform (SMT) examples for the Oracle CDC Connector.
TimestampConverter SMT¶
Suppose you have the following table:
CREATE TABLE CUSTOMERS (
ID NUMBER(10) NOT NULL PRIMARY KEY,
FIRST_NAME VARCHAR(50),
CLUB_STATUS VARCHAR(8),
COMMENTS VARCHAR(90),
CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
If you run a connector, you may see a record similar to the following:
{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"long":1636048731469},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"589654"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636049137001"},"row_id":null,"username":null}
The timestamp is represented in "CREATE_TS":{"long":1636048731469}
, but
sometimes you want to keep the timestamp that is compatible with
java.text.SimpleDateFormat
, so you can use TimestampConverter SMT
as shown in the following example:
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"transforms.TimestampConverter.field": "CREATE_TS",
"transforms.TimestampConverter.target.type": "string"
TimestampConverter SMT
converts "CREATE_TS":{"long":1636048731469}
to
"CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"}
:
{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"594158"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636050925327"},"row_id":null,"username":null}
InsertField SMT¶
Suppose you want to insert a field to specify the origin of records and add the
InsertField
configuration property to the connector configuration as shown
in the following example:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database"
After the connector is up and running, if changes (for example, INSERT
, UPDATE
, or DELETE
) happen in tables you are watching
, you will see
"Origin":{"string":"Oracle Database"}
is added to a table-specific topic
as shown in the following example:
kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636058280541},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"611987"},"op_type":{"string":"I"},"op_ts":{"string":"1636083480000"},"current_ts":{"string":"1636058282236"},"row_id":{"string":"AAAFhxAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}
If you look at the redo log topic, you will also see
"Origin":{"string":"Oracle Database"}
being added, and this is probably not
what you want:
kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":611987},"START_SCN":{"long":611987},"COMMIT_SCN":{"long":611988},"TIMESTAMP":{"long":1636083480000},"START_TIMESTAMP":{"long":1636083480000},"COMMIT_TIMESTAMP":{"long":1636083480000},"XIDUSN":{"long":3},"XIDSLT":{"long":33},"XIDSQN":{"long":407},"XID":{"bytes":"\u0003\u0000!\u0000\u0001\u0000\u0000"},"PXIDUSN":{"long":3},"PXIDSLT":{"long":33},"PXIDSQN":{"long":407},"PXID":{"bytes":"\u0003\u0000!\u0000\u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhxAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":55},"RBABLK":{"long":13},"RBABYTE":{"long":208},"UBAFIL":{"long":3},"UBABLK":{"long":169},"UBAREC":{"long":19},"UBASQN":{"long":74},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22641},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22641},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 13:38:00.541'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 13:38:00.541') and ROWID = 'AAAFhxAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x000037.0000000d.00d0 "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":22},"UNDO_VALUE":{"long":23},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":611988},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false},"Origin":{"string":"Oracle Database"}}
You can leverage KIP-585: Filter and Conditional SMTs to apply SMTs to certain topics only.
"transforms": "InsertField",
"transforms.InsertField.predicate":"has-my-prefix",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database",
"predicates":"has-my-prefix",
"predicates.has-my-prefix.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.has-my-prefix.pattern":"ORCL.ADMIN.*"
After the connector is up and running, if changes happen to tables of your
interest (for example, INSERT
, UPDATE
, or DELETE
) , you will see
"Origin":{"string":"Oracle Database"}
being added to a table-specific topic:
kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636060255772},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"617372"},"op_type":{"string":"I"},"op_ts":{"string":"1636085455000"},"current_ts":{"string":"1636060258267"},"row_id":{"string":"AAAFhzAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}
It is important to note that "Origin":{"string":"Oracle Database"}
won’t
show up in the redo log topic:
kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":617372},"START_SCN":{"long":617371},"COMMIT_SCN":{"long":617373},"TIMESTAMP":{"long":1636085455000},"START_TIMESTAMP":{"long":1636085455000},"COMMIT_TIMESTAMP":{"long":1636085455000},"XIDUSN":{"long":9},"XIDSLT":{"long":31},"XIDSQN":{"long":416},"XID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"PXIDUSN":{"long":9},"PXIDSLT":{"long":31},"PXIDSQN":{"long":416},"PXID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhzAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":61},"RBABLK":{"long":231},"RBABYTE":{"long":140},"UBAFIL":{"long":3},"UBABLK":{"long":2952},"UBAREC":{"long":12},"UBASQN":{"long":70},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22643},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22643},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 14:10:55.772'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 14:10:55.772') and ROWID = 'AAAFhzAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x00003d.000000e7.008c "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":16},"UNDO_VALUE":{"long":17},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":617373},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false}}
PostgreSQL Example¶
This section includes an example of how to move records from Oracle Database to PostgreSQL using the Oracle CDC Source and the JDBC Sink connectors.
Create an Oracle CDC Source connector.
The following configuration will create a snapshot and store new changes (inserts) to a table-specific topic called
ORCLCDB.C__MYUSER.USERS
{ "name": "SimpleOracleCDC_DEMO", "config":{ "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector", "name": "SimpleOracleCDC_DEMO", "tasks.max":3, "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "oracle.server": "localhost", "oracle.port": 1521, "oracle.sid":"ORCLCDB", "oracle.username": "C##MYUSER", "oracle.password": "mypassword", "start.from":"snapshot", "redo.log.topic.name": "redo-log-topic", "redo.log.consumer.bootstrap.servers":"localhost:9092", "table.inclusion.regex": ".*USERS.*", "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}", "connection.pool.max.size": 20, "confluent.topic.replication.factor":1, "lob.topic.name.template":"${databaseName}.${schemaName}.${tableName}.${columnName}", "redo.log.row.fetch.size":1, "numeric.mapping": "best_fit" } }
A sample record in
ORCLCDB.C__MYUSER.USERS
:{"ID":241,"FIRST_NAME":{"string":"Lettie"},"LAST_NAME":{"string":"Kaplan"},"EMAIL":{"string":"Lettie.Kaplan@utvel.us"},"GENDER":{"string":"male"},"CLUB_STATUS":{"string":"active"},"COMMENTS":{"string":"Confluent"},"UPDATE_TS":{"long":1623831883974},"table":{"string":"ORCLCDB.C##MYUSER.USERS"},"scn":{"string":"1450183"},"op_type":{"string":"I"},"op_ts":{"string":"1623857084000"},"current_ts":{"string":"1623831886610"},"row_id":{"string":"AAAR9JAAHAAAACFAAg"},"username":{"string":"C##MYUSER"}}
Create a JDBC Sink connector.
You can use a Single Message Transform (SMT) to drop prefix,
ORCLCDB.C__MYUSER.
, enabling the connector to UPSERT records to aUSERS
table in PostgreSQL as shown in the following example:{ "name": "jdbc_sink_postgres_demo", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://<postgres-url>:5432/<database-name>", "connection.user": "<user>", "connection.password": "<password>", "tasks.max": "2", "topics": "ORCLCDB.C__MYUSER.USERS", "auto.create": "true", "auto.evolve": "true", "dialect.name": "PostgreSqlDatabaseDialect", "insert.mode": "upsert", "pk.mode": "record_value", "pk.fields":"ID", "batch.size": 1, "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "transforms":"dropPrefix", "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.dropPrefix.regex":"ORCLCDB.C__MYUSER.(.*)", "transforms.dropPrefix.replacement":"$1", "errors.tolerance":"all", "errors.deadletterqueue.topic.name":"dlq-jdbc-sink", "errors.deadletterqueue.context.headers.enable": "true", "errors.deadletterqueue.topic.replication.factor":"1" } }
Check PostgreSQL and verify the data looks simliar to following: