Testing Scenarios and Examples for Oracle CDC Source Connector for Confluent Platform

The following sections provide testing scenarios and examples for running the Kafka Connect Oracle CDC Source connector.

Testing scenarios

After installing the connector, you can complete the following testing scenarios before running the connector in a production environment.

Capturing Redo logs only

  1. If not running, start Confluent Platform.

    confluent local services start
    
  2. Create the following connector configuration JSON file and save the file as config1.json.

    Note the following configuration property entries:

    • Configure the connector with a new name.
    • Set table.topic.name.template to an empty string.
    • Set table.inclusion.regex to capture several tables.
    • (Optional) Use redo.log.topic.name to rename the redo log.
    • (Optional) Set redo.log.corruption.topic to specify the topic where you want to record corrupted records.
    {
      "name": "SimpleOracleCDC_1",
      "config":{
        "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
        "name": "SimpleOracleCDC_1",
        "tasks.max":1,
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "oracle.server": "<database-url>",
        "oracle.port": 1521,
        "oracle.sid":"<SID of the CDB>",
        "oracle.pdb.name":"<name of the PDB where tables reside>",
        "oracle.username": "<username e.g. C##MYUSER>",
        "oracle.password": "<password>",
        "start.from":"snapshot",
        "redo.log.topic.name": "redo-log-topic-1",
        "table.inclusion.regex":"<regex-expression>",
        "_table.topic.name.template_":"Set to an empty string to disable generating change event records",
        "table.topic.name.template": "",
        "connection.pool.max.size": 20,
        "confluent.topic.replication.factor":1,
        "topic.creation.groups": "redo",
        "topic.creation.redo.include": "redo-log-topic",
        "topic.creation.redo.replication.factor": 3,
        "topic.creation.redo.partitions": 1,
        "topic.creation.redo.cleanup.policy": "delete",
        "topic.creation.redo.retention.ms": 1209600000,
        "topic.creation.default.replication.factor": 3,
        "topic.creation.default.partitions": 5,
        "topic.creation.default.cleanup.policy": "compact"
      }
    }
    
  3. Enter the following command to start the connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @config1.json http://localhost:8083/connectors | jq
    
  4. Enter the following command to get the connector status:

    curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_1/status | jq
    
  5. Verify the following connector operations are successful:

    • The connector is started with one running task (see the following note).
    • The connector produces records whenever DML events (INSERT, UPDATE, AND DELETE) occur for captured tables.
    • The connector does not produce records for tables that were not included in regex or were explicitly excluded with table.exclusion.regex.
    • If the redo.log.corruption.topic is configured, the connector sends corrupted records to the specified corruption topic.

    Note

    If using the property "start.from":"snapshot", the redo log topic contains only database operations completed after the connector starts.

  6. Enter the following command to check Kafka topics:

    kafka-topics --list --zookeeper localhost:2181
    

    If there are operations on the tables after the connector starts, you should see the topic configured by the redo-log-topic property. If no operations have occurred, there should be nothing displayed other than internal topics.

  7. Consume records using the Avro console consumer.

    kafka-avro-console-consumer --topic redo-log-topic-1 \
    --partition 0 --offset earliest --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081
    

    If there are operations on the tables after the connector starts, you should see records displayed. If no operations have occurred, there should be no records.

  8. Check for errors in the log:

    confluent local services connect log | grep "ERROR"
    
  9. After you finish testing, enter the following command to clean up the running configuration:

    confluent local services destroy
    

Capturing Redo logs and Change Event logs

  1. If not running, start Confluent Platform.

    confluent local services start
    
  2. Create the following connector configuration JSON file and save the file as config2.json.

    {
     "name": "SimpleOracleCDC_2",
     "config":{
       "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
       "name": "SimpleOracleCDC_2",
       "tasks.max":3,
       "key.converter": "io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url": "http://localhost:8081",
       "value.converter": "io.confluent.connect.avro.AvroConverter",
       "value.converter.schema.registry.url": "http://localhost:8081",
       "confluent.topic.bootstrap.servers":"localhost:9092",
       "oracle.server": "<database-url>",
       "oracle.port": 1521,
       "oracle.sid":"<SID of the CDB>",
       "oracle.pdb.name":"<name of the PDB where tables reside>",
       "oracle.username": "<username e.g C##MYUSER>",
       "oracle.password": "<password>",
       "start.from":"snapshot",
       "redo.log.topic.name": "redo-log-topic-2",
       "redo.log.consumer.bootstrap.servers":"localhost:9092",
       "table.inclusion.regex":"<regex-expression>",
       "_table.topic.name.template_":"Using template vars to set change event topic for each table",
       "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
       "connection.pool.max.size": 20,
       "confluent.topic.replication.factor":1,
       "topic.creation.groups": "redo",
       "topic.creation.redo.include": "redo-log-topic-2",
       "topic.creation.redo.replication.factor": 3,
       "topic.creation.redo.partitions": 1,
       "topic.creation.redo.cleanup.policy": "delete",
       "topic.creation.redo.retention.ms": 1209600000,
       "topic.creation.default.replication.factor": 3,
       "topic.creation.default.partitions": 5,
       "topic.creation.default.cleanup.policy": "compact"
     }
    }
    
  3. Create redo-log-topic-2. Make sure the topic name matches the value you put for "redo.log.topic.name".

    bin/kafka-topics --create --topic redo-log-topic-2 \
    --bootstrap-server broker:9092 --replication-factor 1 \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    
  4. Enter the following command to start the connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @config2.json http://localhost:8083/connectors | jq
    
  5. Enter the following command to get the connector status:

    curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_2/status | jq
    
  6. Verify the connector starts with three running tasks.

  7. Perform INSERT, UPDATE, and DELETE row operations for each table and verify the following expected results:

    • The connector creates a redo log topic and change event log topics for each captured table.
    • Redo log events are generated starting from current time.
    • The change event log for each table contains snapshot events (op_type=R) followed by other types of events.
  8. Enter the following command to check Kafka topics:

    kafka-topics --list --zookeeper localhost:2181
    

    You should see the topic configured by the redo-log-topic property and topics in the form of ${databaseName}.${schemaName}.${tableName}.

  9. Consume records using the Avro console consumer.

    kafka-avro-console-consumer --topic redo-log-topic-2 \
    --partition 0 --offset earliest --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081
    

    You should see change event records with op_type=I (insert), op_type=I (update), or op_type=D (delete).

  10. Check for errors in the log.

    confluent local services connect log | grep "ERROR"
    
  11. After you finish testing, enter the following command to clean up the running configuration:

    confluent local services destroy
    

Starting from a specific SCN (without snapshot)

  1. If not running, start Confluent Platform.

    confluent local services start
    
  2. Create the following connector configuration JSON file. Save the JSON file using the name config3.json.

    You have to choose an Oracle System Change Number (SCN) that exists with (at minimum) a redo log with the SCN or timestamp. The log has to be applicable for one of the included tables. You can use SELECT CURRENT_SCN FROM v$database; to query the current SCN of the database.

    {
     "name": "SimpleOracleCDC_3",
     "config":{
       "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
       "name": "SimpleOracleCDC_3",
       "tasks.max":3,
       "key.converter": "io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url": "http://localhost:8081",
       "value.converter": "io.confluent.connect.avro.AvroConverter",
       "value.converter.schema.registry.url": "http://localhost:8081",
       "confluent.topic.bootstrap.servers":"localhost:9092",
       "oracle.server": "<database-url>",
       "oracle.port": 1521,
       "oracle.sid":"<SID of the CDB>",
       "oracle.pdb.name":"<name of the PDB where tables reside>",
       "oracle.username": "<username e.g. C##MYUSER>",
       "oracle.password": "<password>",
       "_start.from_":"Set to a proper scn or timestamp to start without snapshotting tables",
       "start.from":"<SCN>",
       "redo.log.topic.name": "redo-log-topic-3",
       "redo.log.consumer.bootstrap.servers":"localhost:9092",
       "table.inclusion.regex":"<regex-expression>",
       "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
       "connection.pool.max.size": 20,
       "confluent.topic.replication.factor":1,
       "topic.creation.groups": "redo",
       "topic.creation.redo.include": "redo-log-topic-3",
       "topic.creation.redo.replication.factor": 3,
       "topic.creation.redo.partitions": 1,
       "topic.creation.redo.cleanup.policy": "delete",
       "topic.creation.redo.retention.ms": 1209600000,
       "topic.creation.default.replication.factor": 3,
       "topic.creation.default.partitions": 5,
       "topic.creation.default.cleanup.policy": "compact"
     }
    }
    
  3. Create redo-log-topic-3. Make sure the topic name matches the value you put for "redo.log.topic.name".

    bin/kafka-topics --create --topic redo-log-topic-3 \
    --bootstrap-server broker:9092 --replication-factor 1 \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    
  4. Enter the following command to start the connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @config3.json http://localhost:8083/connectors | jq
    
  5. Enter the following command to get the connector status:

    curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_3/status | jq
    

    Verify that connector is started with three running tasks.

  6. Perform INSERT, ``UPDATE``, and DELETE row operations for each table and verify the following expected results:

    • The connector creates a redo log topic and change event log topics for each captured table.
    • Redo log events are generated starting from current time.
    • The change event log for each table stars from the specified start.from values and do not contain snapshot events (op_type=R).
  7. Enter the following command to check Kafka topics:

    kafka-topics --list --zookeeper localhost:2181
    

    You should see the topic configured by the redo-log-topic property and topics in the form of ${databaseName}.${schemaName}.${tableName}.

  8. Consume records using the Avro console consumer.

    kafka-avro-console-consumer --topic redo-log-topic-3 \
    --partition 0 --offset earliest --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081
    

    You should see table-specific topics with snapshot records (op_type=R).

  9. Check for errors in the log.

    confluent local services connect log | grep "ERROR"
    
  10. After you finish testing, enter the following command to clean up the running configuration:

    confluent local services destroy
    

Capturing LOB type columns

The following example shows how to capture LOB type columns.

Prerequisites

  • A table that contains columns stored as LOB (BLOB, CLOB, or NCLOB) data type.
  • The table must have a primary key.

Procedure

  1. If not running, start Confluent Platform.

    confluent local services start
    
  2. Create the following connector configuration JSON file and save it as config4.json.

    You have to choose an Oracle System Change Number (SCN) that exists with (at minimum) a redo log with the SCN or timestamp. The log has to be applicable for one of the included tables. You can use SELECT CURRENT_SCN FROM v$database; to query the current SCN of the database.

    {
           "name": "SimpleOracleCDC_4",
           "config":{
             "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
             "name": "SimpleOracleCDC_4",
             "tasks.max":2,
             "key.converter": "io.confluent.connect.avro.AvroConverter",
             "key.converter.schema.registry.url": "http://localhost:8081",
             "value.converter": "io.confluent.connect.avro.AvroConverter",
             "value.converter.schema.registry.url": "http://localhost:8081",
             "confluent.topic.bootstrap.servers":"localhost:9092",
             "oracle.server": "<database-url>",
             "oracle.port": 1521,
             "oracle.sid":"<SID of the CDB>",
             "oracle.pdb.name":"<name of the PDB where tables reside>",
             "oracle.username": "<username e.g. C##MYUSER>",
             "oracle.password": "<password>",
             "start.from":"snapshot",
             "redo.log.topic.name": "redo-log-topic-4",
             "redo.log.consumer.bootstrap.servers":"localhost:9092",
             "table.inclusion.regex":"<regex-expression>",
             "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
             "_lob.topic.name.template_": "Using template vars to set lob topic for each table",
             "lob.topic.name.template": "${tableName}.${columnName}_topic",
             "connection.pool.max.size": 20,
             "confluent.topic.replication.factor":1,
             "topic.creation.groups": "redo",
             "topic.creation.redo.include": "redo-log-topic-4",
             "topic.creation.redo.replication.factor": 3,
             "topic.creation.redo.partitions": 1,
             "topic.creation.redo.cleanup.policy": "delete",
             "topic.creation.redo.retention.ms": 1209600000,
             "topic.creation.default.replication.factor": 3,
             "topic.creation.default.partitions": 5,
             "topic.creation.default.cleanup.policy": "compact"
           }
    }
    
  3. Create redo-log-topic-4. Make sure the topic name matches the value you put for "redo.log.topic.name".

    bin/kafka-topics --create --topic redo-log-topic-4 \
    --bootstrap-server broker:9092 --replication-factor 1 \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    
  4. Enter the following command to start the connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @config4.json http://localhost:8083/connectors | jq
    
  5. Enter the following command and verify that connector is started with two running tasks.

    curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_4/status | jq
    
  6. Perform INSERT, UPDATE, and DELETE row operations for each table and verify the following expected results:

    • The redo log topic is created.

    • Change event topics are created for each captured table.

    • LOB topics are created for each LOB column.

    • The key of the LOB topic records contain the following information:

      {
        "table", "dot.separated.fully.qualified.table.name",
        "column", "column.name.of.LOB.column",
        "primary_key", "primary.key.of.change.event.topic.after.applying.${key.template}"
      }
      
    • The value of the LOB topic is the LOB value.

    • When a row is deleted from the table, the corresponding LOB is deleted from the LOB topic. The connector writes a tombstone record (null value) to the LOB topic.

  7. After you finish testing, enter the following command to clean up the running configuration:

    confluent local services destroy
    

Using Kerberos authentication

Use the following example to set up the connector to use Kerberos version 5 (v5). The example uses the redo logs to show how the configuration is set up.

Note

  • This connector only supports authentication with Kerberos v5.
  • The previous examples in this section can also be used to check out Kerberos v5 functionality. Ensure you verify the following prerequisites and add the additional Kerberos property to the connector configuration.

Prerequisites

  • You must enable Kerberos on your Oracle Database instance. For instructions, see Configuring Kerberos Authentication. If your Oracle Database conforms to multi-tenant architecture (that is, has a container and pluggable databases), ensure the connector user account created is a common user and you set the appropriate user privileges for the user.
  • A Kerberos Key Distribution Center (KDC) must be configured within the domain.
  • The KDC must be accessible from the Connect workers. To do this, ensure the krb5.conf file in each Connect worker correctly maps your realm to the KDC.
  • A Connect Kerberos user principle in Oracle DB in the KDC will be copied from the corresponding keytab file to each connect worker.

Procedure

  1. In your Connect worker, run the following command:

    kinit -kt /path/to/the/keytab --renewable -f <your-connect-user-principle>
    

    The flags --renewable and -f are required when using kinit, since a long running connector has to renew the ticket-granting ticket (TGT) and the tickets must be forwardable (-f).

    Tip

    You can add the flag -c /path/to/credentials/cache/file to save your credentials cache file in a preferred location. The credentials cache file is the only requirement for authenticating the connector to Oracle using Kerberos.

  2. If not running, start Confluent Platform.

    confluent local services start
    
  3. Create the following connector configuration JSON file and save it as config5.json.

    Note

    The Oracle CDC Source connector uses the oracle.kerberos.cache.file configuration property to specify the path to the cache file generated previously.

    {
           "name": "SimpleOracleCDC_5",
           "config":{
             "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
             "name": "SimpleOracleCDC_5",
             "tasks.max":1,
             "key.converter": "io.confluent.connect.avro.AvroConverter",
             "key.converter.schema.registry.url": "http://localhost:8081",
             "value.converter": "io.confluent.connect.avro.AvroConverter",
             "value.converter.schema.registry.url": "http://localhost:8081",
             "confluent.topic.bootstrap.servers":"localhost:9092",
             "oracle.server": "",
             "oracle.port": 1521,
             "oracle.sid":"",
             "oracle.pdb.name":"",
             "oracle.username": "<C##MYUSER>",
             "oracle.password": "",
             "start.from":"snapshot",
             "redo.log.topic.name": "redo-log-topic-5",
             "table.inclusion.regex":"",
             "table.topic.name.template":"<Set to an empty string to disable generating change event records>",
             "table.topic.name.template": "",
             "connection.pool.max.size": 20,
             "confluent.topic.replication.factor":1,
             "topic.creation.groups": "redo",
             "topic.creation.redo.include": "redo-log-topic-5",
             "topic.creation.redo.replication.factor": 3,
             "topic.creation.redo.partitions": 1,
             "topic.creation.redo.cleanup.policy": "delete",
             "topic.creation.redo.retention.ms": 1209600000,
             "topic.creation.default.replication.factor": 3,
             "topic.creation.default.partitions": 5,
             "topic.creation.default.cleanup.policy": "compact",
             "oracle.kerberos.cache.file": "</path/to/my/kerberos/credentials-file>"
           }
    }
    
  4. Create redo-log-topic-5. Make sure the topic name matches the value you put for "redo.log.topic.name".

    bin/kafka-topics --create --topic redo-log-topic-5 \
    --bootstrap-server broker:9092 --replication-factor 1 \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    
  5. Enter the following command to start the connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @config5.json http://localhost:8083/connectors | jq
    
  6. Enter the following command and verify that connector is started with two running tasks.

    curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_5/status | jq
    
  7. Perform INSERT, UPDATE, and DELETE row operations for each and verify following lists expected results:

    • The connector starts and has one running task.
    • Change event topics are created for each captured table.
    • The connector does not produce records for tables that were not included in regex or were explicitly excluded using the table.exclusion.regex property.
    • If the redo.log.corruption.topic property was configured, the connector sends corrupted records to the specified corruption topic.

Using SSL for encrypted communication and server/client authentication

Use the following example to set up the connector to use SSL one and two-way (mutual) authentication. The example uses the redo logs to show how the configuration is set up.

Prerequisites

Procedure

  1. If not running, start Confluent Platform.

    confluent local services start
    
  2. Create the following connector configuration JSON file and save it as config6.json.

    Note

    Note the following properties (shown in the example):

    • The Oracle CDC Source connector uses the oracle.ssl.truststore.file and oracle.ssl.truststore.password properties to specify the location of the truststore containing the trusted server certificate and the truststore password.
    • The passthrough properties oracle.connection.javax.net.ssl.keyStore and oracle.connection.javax.net.ssl.keyStorePassword are also used to supply the keystore location and password.
    {
           "name": "SimpleOracleCDC_6",
           "config":{
             "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
             "name": "SimpleOracleCDC_6",
             "tasks.max":1,
             "key.converter": "io.confluent.connect.avro.AvroConverter",
             "key.converter.schema.registry.url": "http://localhost:8081",
             "value.converter": "io.confluent.connect.avro.AvroConverter",
             "value.converter.schema.registry.url": "http://localhost:8081",
             "confluent.topic.bootstrap.servers":"localhost:9092",
             "oracle.server": "",
             "oracle.port": 1521,
             "oracle.sid":"",
             "oracle.pdb.name":"",
             "oracle.username": "<C##MYUSER>",
             "oracle.password": "",
             "start.from":"snapshot",
             "redo.log.topic.name": "redo-log-topic-6",
             "table.inclusion.regex":"",
             "table.topic.name.template":"<Set to an empty string to disable generating change event records>",
             "table.topic.name.template": "",
             "connection.pool.max.size": 20,
             "confluent.topic.replication.factor":1,
             "topic.creation.groups": "redo",
             "topic.creation.redo.include": "redo-log-topic-6",
             "topic.creation.redo.replication.factor": 3,
             "topic.creation.redo.partitions": 1,
             "topic.creation.redo.cleanup.policy": "delete",
             "topic.creation.redo.retention.ms": 1209600000,
             "topic.creation.default.replication.factor": 3,
             "topic.creation.default.partitions": 5,
             "topic.creation.default.cleanup.policy": "compact",
             "oracle.ssl.truststore.file": "</path/to/truststore/file/containing/server/certs>",
             "oracle.ssl.truststore.password": "<password>",
             "oracle.connection.javax.net.ssl.keyStore": "</path/to/keystore/file>",
             "oracle.connection.javax.net.ssl.keyStorePassword": "<password>"
           }
    }
    
  3. Create redo-log-topic-6. Make sure the topic name matches the value you put for "redo.log.topic.name".

    bin/kafka-topics --create --topic redo-log-topic-6 \
    --bootstrap-server broker:9092 --replication-factor 1 \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    
  4. Enter the following command to start the connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @config5.json http://localhost:8083/connectors | jq
    
  5. Enter the following command and verify that connector is started with two running tasks.

    curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_6/status | jq
    
  6. Perform INSERT, UPDATE, and DELETE row operations for each and verify following lists expected results:

    • The connector starts and has one running task.
    • Change event topics are created for each captured table.
    • The connector does not produce records for tables that were not included in regex or were explicitly excluded using the table.exclusion.regex property.
    • If the redo.log.corruption.topic property was configured, the connector sends corrupted records to the specified corruption topic.

Connecting to an Oracle RAC cluster using service name

The following example demonstrates how to configure the connector to connect to an Oracle RAC cluster using the service name instead of SID.

Prerequisites

  • Oracle RAC cluster configured with a service name and multiple database instances

Procedure

  1. If not running, start Confluent Platform.

    confluent local services start
    
  2. Create the following connector configuration JSON file and save it as config7.json.

    Note

    • oracle.service.name property specifies the service name to use when connecting to RAC
    • An oracle.sid is still required. It can be the SID of any of the database instances.
    {
           "name": "SimpleOracleCDC_7",
           "config":{
             "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
             "name": "SimpleOracleCDC_7",
             "tasks.max":1,
             "key.converter": "io.confluent.connect.avro.AvroConverter",
             "key.converter.schema.registry.url": "http://localhost:8081",
             "value.converter": "io.confluent.connect.avro.AvroConverter",
             "value.converter.schema.registry.url": "http://localhost:8081",
             "confluent.topic.bootstrap.servers":"localhost:9092",
             "oracle.server": "",
             "oracle.port": 1521,
             "oracle.sid":"",
             "oracle.service.name":"",
             "oracle.pdb.name":"",
             "oracle.username": "<C##MYUSER>",
             "oracle.password": "",
             "start.from":"snapshot",
             "redo.log.topic.name": "redo-log-topic-7",
             "table.inclusion.regex":"",
             "table.topic.name.template":"<Set to an empty string to disable generating change event records>",
             "table.topic.name.template": "",
             "connection.pool.max.size": 20,
             "confluent.topic.replication.factor":1,
             "topic.creation.groups": "redo",
             "topic.creation.redo.include": "redo-log-topic-6",
             "topic.creation.redo.replication.factor": 3,
             "topic.creation.redo.partitions": 1,
             "topic.creation.redo.cleanup.policy": "delete",
             "topic.creation.redo.retention.ms": 1209600000,
             "topic.creation.default.replication.factor": 3,
             "topic.creation.default.partitions": 5,
             "topic.creation.default.cleanup.policy": "compact"
           }
    }
    
  3. Create redo-log-topic-7. Make sure the topic name matches the value you put for "redo.log.topic.name".

    bin/kafka-topics --create --topic redo-log-topic-7 \
    --bootstrap-server broker:9092 --replication-factor 1 \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    
  4. Enter the following command to start the connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @config5.json http://localhost:8083/connectors | jq
    
  5. Enter the following command and verify that connector is started with two running tasks.

    curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_7/status | jq
    
  6. Perform INSERT, UPDATE, and DELETE row operations for each and verify following lists expected results:

    • The connector starts and has one running task.
    • Change event topics are created for each captured table.
    • The connector does not produce records for tables that were not included in regex or were explicitly excluded using the table.exclusion.regex property.
    • If the redo.log.corruption.topic property was configured, the connector sends corrupted records to the specified corruption topic.

Capturing Redo logs and Snapshot with Supplemental logging only

  1. If not running, start Confluent Platform.

    confluent local services start
    
  2. Create the following connector configuration JSON file and save the file as config1.json.

    Note the following configuration property entries:

    • Configure the connector with a new name.
    • Set table.inclusion.regex to capture several tables.
    • (Optional) Set redo.log.corruption.topic to specify the topic where you want to record corrupted records.
    {
     "name": "SimpleOracleCDC_8",
     "config":{
       "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
       "name": "SimpleOracleCDC_8",
       "tasks.max": 3,
       "key.converter": "io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url": "http://localhost:8081",
       "value.converter": "io.confluent.connect.avro.AvroConverter",
       "value.converter.schema.registry.url": "http://localhost:8081",
       "confluent.topic.bootstrap.servers": "localhost:9092",
       "oracle.server": "<database-url>",
       "oracle.port": 1521,
       "oracle.sid": "<SID of the CDB>",
       "oracle.pdb.name": "<name of the PDB where tables reside>",
       "oracle.username": "<username e.g C##MYUSER>",
       "oracle.password": "<password>",
       "oracle.supplemental.log.level": "msl",
       "start.from": "snapshot",
       "redo.log.topic.name": "redo-log-topic-2",
       "redo.log.consumer.bootstrap.servers": "localhost:9092",
       "table.inclusion.regex": "<regex-expression>",
       "_table.topic.name.template_": "Using template vars to set change event topic for each table",
       "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
       "connection.pool.max.size": 20,
       "confluent.topic.replication.factor": 1,
       "topic.creation.groups": "redo",
       "topic.creation.redo.include": "redo-log-topic-8",
       "topic.creation.redo.replication.factor": 3,
       "topic.creation.redo.partitions": 1,
       "topic.creation.redo.cleanup.policy": "delete",
       "topic.creation.redo.retention.ms": 1209600000,
       "topic.creation.default.replication.factor": 3,
       "topic.creation.default.partitions": 5,
       "topic.creation.default.cleanup.policy": "compact"
     }
    }
    
  3. Enter the following command to start the connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @config1.json http://localhost:8083/connectors | jq
    
  4. Enter the following command to get the connector status:

    curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC_8/status | jq
    
  5. Verify the following connector operations are successful:

    • The connector is started with three running tasks (see the following note).
    • The connector produces snapshot records for captured tables.

    Note

    If using the property "start.from":"snapshot", the redo log topic contains only database operations completed after the connector starts.

  6. Enter the following command to check Kafka topics:

    kafka-topics --list --zookeeper localhost:2181
    

    You should see the topic configured by the redo-log-topic property and topics in the form of ${databaseName}.${schemaName}.${tableName}.

  7. Consume records using the Avro console consumer.

    kafka-avro-console-consumer --topic redo-log-topic-8 \
    --partition 0 --offset earliest --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081
    

    If there are operations on the tables after the connector starts, you should see records displayed. If no operations have occurred, there should be no records.

  8. Check for errors in the log:

    confluent local services connect log | grep "ERROR"
    
  9. After you finish testing, enter the following command to clean up the running configuration:

    confluent local services destroy
    

Configuration examples for running against Confluent Cloud

The following configuration examples allow you to run the connector against Confluent Cloud.

Note

The configuration examples are based on a connector running locally using Confluent Platform version 6.0 (and later).

Using the Avro converter with Schema Registry

The following worker and connector configuration examples are for running the Oracle CDC Source connector against Confluent Cloud with Confluent Cloud Schema Registry and the Avro (io.confluent.connect.avro.AvroConverter) value converter.

Distributed worker configuration

  1. Create your my-connect-distributed.properties file based on the following example.

    bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=io.confluent.connect.avro.AvroConverter
    
    ssl.endpoint.identification.algorithm=https
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>";
    request.timeout.ms=20000
    retry.backoff.ms=500
    
    producer.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>
    producer.ssl.endpoint.identification.algorithm=https
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=PLAIN
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>";
    producer.request.timeout.ms=20000
    producer.retry.backoff.ms=500
    
    consumer.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.security.protocol=SASL_SSL
    consumer.sasl.mechanism=PLAIN
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>";
    consumer.request.timeout.ms=20000
    consumer.retry.backoff.ms=500
    
    offset.flush.interval.ms=10000
    offset.storage.file.filename=/tmp/connect.offsets
    group.id=connect-cluster
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=3
    offset.storage.partitions=3
    config.storage.topic=connect-configs
    config.storage.replication.factor=3
    status.storage.topic=connect-status
    status.storage.replication.factor=3
    
    # Schema Registry specific settings
    # We recommend you use Confluent Cloud Schema Registry if you run Oracle CDC Source against Confluent Cloud
    value.converter.basic.auth.credentials.source=USER_INFO
    value.converter.schema.registry.basic.auth.user.info=<schema-registry-api-key>:<schema-registry-api-secret>
    value.converter.schema.registry.url=<your-schema-registry-url e.g https://xyz.us-east-2.aws.confluent.cloud>
    
    # Confluent license settings
    confluent.topic.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>
    confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>";
    confluent.topic.security.protocol=SASL_SSL
    confluent.topic.sasl.mechanism=PLAIN
    
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include
    # any combination of:
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Examples:
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/usr/share/java,<path-to>/confluent-6.0.0/share/confluent-hub-components
    
  2. Start Kafka Connect with the following command:

    <path-to-confluent-home>/bin/connect-distributed my-connect-distributed.properties
    

Connector configuration

  1. Create your oracle-cdc-confluent-cloud.json file based on the following example:

    {
    
       "name": "OracleCDC_Confluent_Cloud",
       "config":{
          "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
          "name": "OracleCDC_Confluent_Cloud",
          "tasks.max":3,
    
          "oracle.server": "<database-url>",
          "oracle.sid":"<SID of the CDB>",
          "oracle.pdb.name":"<name of the PDB where tables reside. If you don't have PDB, remove this config property>",
          "oracle.username": "<username e.g. C##MYUSER>",
          "oracle.password": "<password>",
          "start.from":"snapshot",
    
          "redo.log.topic.name": "oracle-redo-log-topic",
          "redo.log.consumer.bootstrap.servers":"<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>",
          "redo.log.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-api-key>\" password=\"<kafka-api-secret>\";",
          "redo.log.consumer.security.protocol":"SASL_SSL",
          "redo.log.consumer.sasl.mechanism":"PLAIN",
    
          "table.inclusion.regex":"<regex-expression e.g. ORCL[.]ADMIN[.]MARIPOSA.*>",
          "_table.topic.name.template_":"Using template vars to set change event topic for each table",
          "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
          "connection.pool.max.size": 20,
          "confluent.topic.replication.factor":3,
    
          "topic.creation.groups":"redo",
          "topic.creation.redo.include":"oracle-redo-log-topic",
          "topic.creation.redo.replication.factor":3,
          "topic.creation.redo.partitions":1,
          "topic.creation.redo.cleanup.policy":"delete",
          "topic.creation.redo.retention.ms":1209600000,
          "topic.creation.default.replication.factor":3,
          "topic.creation.default.partitions":5,
          "topic.creation.default.cleanup.policy":"compact",
    
          "confluent.topic.bootstrap.servers":"<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>",
          "confluent.topic.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-api-key>\" password=\"<kafka-api-secret>\";",
          "confluent.topic.security.protocol":"SASL_SSL",
          "confluent.topic.sasl.mechanism":"PLAIN",
    
          "value.converter":"io.confluent.connect.avro.AvroConverter",
          "value.converter.basic.auth.credentials.source":"USER_INFO",
          "value.converter.schema.registry.basic.auth.user.info":"<schema-registry-api-key>:<schema-registry-api-secret>",
          "value.converter.schema.registry.url":"<your-schema-registry-url e.g https://xyz.us-east-2.aws.confluent.cloud>"
    
        }
    }
    
  2. Create oracle-redo-log-topic. Make sure the topic name matches the value you put for "redo.log.topic.name".

    Confluent Platform CLI

    bin/kafka-topics --create --topic oracle-redo-log-topic \
    --bootstrap-server broker:9092 --replication-factor 1 \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    

    Confluent Cloud CLI

    confluent kafka topic create oracle-redo-log-topic \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    
  3. Start the Oracle CDC Source connector with the following command:

    curl -s -H "Content-Type: application/json" -X POST -d @oracle-cdc-confluent-cloud.json http://localhost:8083/connectors/ | jq
    

Using the JSON (schemaless) converter

The following worker and connector configuration examples are for running the Oracle CDC Source connector against Confluent Cloud with the (schemaless) JSON (org.apache.kafka.connect.json.JsonConverter) value converter.

Distributed worker configuration

  1. Create your my-connect-distributed-json.properties file based on the following example.

    bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    
    ssl.endpoint.identification.algorithm=https
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>";
    request.timeout.ms=20000
    retry.backoff.ms=500
    
    producer.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>
    producer.ssl.endpoint.identification.algorithm=https
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=PLAIN
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>";
    producer.request.timeout.ms=20000
    producer.retry.backoff.ms=500
    
    consumer.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.security.protocol=SASL_SSL
    consumer.sasl.mechanism=PLAIN
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>";
    consumer.request.timeout.ms=20000
    consumer.retry.backoff.ms=500
    
    offset.flush.interval.ms=10000
    offset.storage.file.filename=/tmp/connect.offsets
    group.id=connect-cluster
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=3
    offset.storage.partitions=3
    config.storage.topic=connect-configs
    config.storage.replication.factor=3
    status.storage.topic=connect-status
    status.storage.replication.factor=3
    
    # Confluent license settings
    confluent.topic.bootstrap.servers=<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>
    confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<kafka-api-key>" password="<kafka-api-secret>";
    confluent.topic.security.protocol=SASL_SSL
    confluent.topic.sasl.mechanism=PLAIN
    
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include
    # any combination of:
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Examples:
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/usr/share/java,<path-to>/confluent-6.0.0/share/confluent-hub-components
    
    # Enable source connectors to create topics
    # KIP-158
    topic.creation.enable=true
    
  2. Start Kafka Connect with the following command:

    <path-to-confluent-home>/bin/connect-distributed my-connect-distributed-json.properties
    

Connector configuration

  1. Create your oracle-cdc-confluent-cloud-json.json file based on the following example:

     {
       "name": "OracleCDC_Confluent_Cloud",
       "config":{
       "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
       "name": "OracleCDC_Confluent_Cloud",
       "tasks.max":3,
    
       "oracle.server": "<database-url>",
       "oracle.sid":"<SID of the CDB>",
       "oracle.pdb.name":"<name of the PDB where tables reside. If you don't have PDB, remove this config property>",
       "oracle.username": "<username e.g. C##MYUSER>",
       "oracle.password": "<password>",
       "start.from":"snapshot",
    
       "redo.log.topic.name": "oracle-redo-log-topic",
       "redo.log.consumer.bootstrap.servers":"<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>",
       "redo.log.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-api-key>\" password=\"<kafka-api-secret>\";",
       "redo.log.consumer.security.protocol":"SASL_SSL",
       "redo.log.consumer.sasl.mechanism":"PLAIN",
    
       "table.inclusion.regex":"<regex-expression e.g. ORCL[.]ADMIN[.]MARIPOSA.*>",
       "_table.topic.name.template_":"Using template vars to set change event topic for each table",
       "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
       "connection.pool.max.size": 20,
       "confluent.topic.replication.factor":3,
    
       "topic.creation.groups":"redo",
       "topic.creation.redo.include":"oracle-redo-log-topic",
       "topic.creation.redo.replication.factor":3,
       "topic.creation.redo.partitions":1,
       "topic.creation.redo.cleanup.policy":"delete",
       "topic.creation.redo.retention.ms":1209600000,
       "topic.creation.default.replication.factor":3,
       "topic.creation.default.partitions":5,
       "topic.creation.default.cleanup.policy":"compact",
    
      "confluent.topic.bootstrap.servers":"<your-bootstrap-server e.g. xyz.us-central1.gcp.confluent.cloud:9092>",
      "confluent.topic.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-api-key>\" password=\"<kafka-api-secret>\";",
      "confluent.topic.security.protocol":"SASL_SSL",
      "confluent.topic.sasl.mechanism":"PLAIN",
    
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false"
     }
    }
    
  2. Create oracle-redo-log-topic. Make sure the topic name matches the value you put for "redo.log.topic.name".

    Confluent Platform CLI

    bin/kafka-topics --create --topic oracle-redo-log-topic \
    --bootstrap-server broker:9092 --replication-factor 1 \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    

    Confluent Cloud CLI

    confluent kafka topic create oracle-redo-log-topic \
    --partitions 1 --config cleanup.policy=delete \
    --config retention.ms=120960000
    
  3. Start the Oracle CDC Source connector using the following command:

    curl -s -H "Content-Type: application/json" -X POST -d @oracle-cdc-confluent-cloud-json.json http://localhost:8083/connectors/ | jq
    

Oracle CDC Source Connector output examples

The following are examples of the output the Oracle CDC Source Connector creates.

CUSTOMERS table is created by Admin on Amazon RDS for Oracle

CREATE TABLE CUSTOMERS (
ID NUMBER(10) NOT NULL PRIMARY KEY,
FIRST_NAME VARCHAR(50),
CLUB_STATUS VARCHAR(8),
COMMENTS VARCHAR(90),
CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Connector is configured with AVRO schema

"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",

Table-specific topic: Schemas and records created by the Oracle CDC Connector with AvroConverter

Schema for Kafka Record Key

The schema depends on this key.template. A “long” type is used in this example because the primary key on the table is id NUMBER(10) NOT NULL PRIMARY KEY:

long

Schema for Kafka Record Value

{
  "fields": [
    {
      "name": "ID",
      "type": "long"
    },
    {
      "default": null,
      "name": "FIRST_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "CLUB_STATUS",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "COMMENTS",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "CREATE_TS",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "table",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "scn",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "op_type",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "op_ts",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "current_ts",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "row_id",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "username",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "type": "record"
}

Kafka record key for a snapshot

1

Kafka record value for a snapshot

{
  "ID": 1,
  "FIRST_NAME": {
    "string": "Rica"
  },
  "CLUB_STATUS": {
    "string": "bronze"
  },
  "COMMENTS": {
    "string": "Universal optimal hierarchy"
  },
  "CREATE_TS": {
    "long": 1634834346041
  },
  "table": {
    "string": "ORCL.ADMIN.CUSTOMERS"
  },
  "scn": {
    "string": "582847"
  },
  "op_type": {
    "string": "R"
  },
  "op_ts": null,
  "current_ts": {
    "string": "1634834582666"
  },
  "row_id": null,
  "username": null
}

Kafka record key for a new insert after Oracle CDC Connector is running

5

Kafka record value for a new insert after Oracle CDC Connector is running

{
  "ID": 5,
  "FIRST_NAME": {
    "string": "Hansiain"
  },
  "CLUB_STATUS": {
    "string": "platinum"
  },
  "COMMENTS": {
    "string": "Centralized full-range approach"
  },
  "CREATE_TS": {
    "long": 1634834933061
  },
  "table": {
    "string": "ORCL.ADMIN.CUSTOMERS"
  },
  "scn": {
    "string": "584627"
  },
  "op_type": {
    "string": "I"
  },
  "op_ts": {
    "string": "1634860133000"
  },
  "current_ts": {
    "string": "1634834938235"
  },
  "row_id": {
    "string": "AAAFhYAAAAAAAMcAAE"
  },
  "username": {
    "string": "ADMIN"
  }
}

Use the configuration parameter output.before.state.field to have the connector include the before state in the change event records for update operations. In the following example, the value specified for this configuration parameter is before.

There are no changes to the change event records for snapshot, insert and delete operations when using this feature.

Schema for kafka record value

{
  "fields": [
    {
      "name": "ID",
      "type": "long"
    },
    {
      "default": null,
      "name": "FIRST_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "CLUB_STATUS",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "COMMENTS",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "CREATE_TS",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "table",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "scn",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "op_type",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "op_ts",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "current_ts",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "row_id",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "username",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "before",
      "type": [
        "null",
        {
          "fields": [
            {
              "name": "ID",
              "type": "long"
            },
            {
              "default": null,
              "name": "FIRST_NAME",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "CLUB_STATUS",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "COMMENTS",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "CREATE_TS",
              "type": [
                "null",
                {
                  "connect.name": "org.apache.kafka.connect.data.Timestamp",
                  "connect.version": 1,
                  "logicalType": "timestamp-millis",
                  "type": "long"
                }
              ]
            }
          ],
          "name": "ConnectDefault2",
          "type": "record"
        }
      ]
    }
  ],
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "type": "record"
}

Kafka record key for an update after Oracle CDC Connector is running

5

Kafka record value for an update after Oracle CDC Connector is running

{
  "ID": 5,
  "FIRST_NAME": {
    "string": "Hansiain"
  },
  "CLUB_STATUS": {
    "string": "platinum"
  },
  "COMMENTS": {
    "string": "Centralized full-range approach"
  },
  "CREATE_TS": {
    "long": 1634834933061
  },
  "table": {
    "string": "ORCLCDB.C##MYUSER.CUSTOMERS"
  },
  "scn": {
    "string": "2162621"
  },
  "op_type": {
    "string": "U"
  },
  "op_ts": {
    "string": "1715765856000"
  },
  "current_ts": {
    "string": "1715765857516"
  },
  "row_id": {
    "string": "AAAR34AAHAAAAFbAAG"
  },
  "username": {
    "string": "SYS"
  },
  "before": {
    "io.confluent.connect.avro.ConnectDefault2": {
      "ID": 5,
      "FIRST_NAME": {
        "string": "Hansiain"
      },
      "CLUB_STATUS": {
        "string": "gold"
      },
      "COMMENTS": {
        "string": "Centralized full-range approach"
      },
      "CREATE_TS": {
        "long": 1634834933061
      }
    }
  }
}

Redo log topic: Schemas and records created by Oracle CDC Connector with AvroConverter

Schema for Kafka Record Key

[
  "null",
  "string"
]

Schema for Kafka Record Value

  {
    "fields": [
      {
        "default": null,
        "name": "SCN",
        "type": [
          "null",
          "long"
        ]
      },
      {
        "default": null,
        "name": "START_SCN",
        "type": [
          "null",
          "long"
        ]
      },
      {
        "default": null,
        "name": "COMMIT_SCN",
        "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "TIMESTAMP",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "START_TIMESTAMP",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "COMMIT_TIMESTAMP",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "XIDUSN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "XIDSLT",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "XIDSQN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "XID",
      "type": [
        "null",
        "bytes"
      ]
    },
    {
      "default": null,
      "name": "PXIDUSN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "PXIDSLT",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "PXIDSQN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "PXID",
      "type": [
        "null",
        "bytes"
      ]
    },
    {
      "default": null,
      "name": "TX_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "OPERATION",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "OPERATION_CODE",
      "type": [
        "null",
        {
          "connect.type": "int8",
          "type": "int"
        }
      ]
    },
    {
      "default": null,
      "name": "ROLLBACK",
      "type": [
        "null",
        "boolean"
      ]
    },
    {
      "default": null,
      "name": "SEG_OWNER",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "SEG_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "TABLE_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "SEG_TYPE",
      "type": [
        "null",
        {
          "connect.type": "int8",
          "type": "int"
        }
      ]
    },
    {
      "default": null,
      "name": "SEG_TYPE_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "TABLE_SPACE",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "ROW_ID",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "USERNAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "OS_USERNAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "MACHINE_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "AUDIT_SESSIONID",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SESSION_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SERIAL_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SESSION_INFO",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "THREAD_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SEQUENCE_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "RBASQN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "RBABLK",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "RBABYTE",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "UBAFIL",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "UBABLK",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "UBAREC",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "UBASQN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "ABS_FILE_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "REL_FILE_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "DATA_BLK_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "DATA_OBJ_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "DATA_OBJV_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "DATA_OBJD_NUM",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SQL_REDO",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "SQL_UNDO",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "RS_ID",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "SSN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "CSF",
      "type": [
        "null",
        "boolean"
      ]
    },
    {
      "default": null,
      "name": "INFO",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "STATUS",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "default": null,
      "name": "REDO_VALUE",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "UNDO_VALUE",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SAFE_RESUME_SCN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "CSCN",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "OBJECT_ID",
      "type": [
        "null",
        "bytes"
      ]
    },
    {
      "default": null,
      "name": "EDITION_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "CLIENT_ID",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "SRC_CON_NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "SRC_CON_ID",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SRC_CON_UID",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SRC_CON_DBID",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "SRC_CON_GUID",
      "type": [
        "null",
        "bytes"
      ]
    },
    {
      "default": null,
      "name": "CON_ID",
      "type": [
        "null",
        "boolean"
      ]
    }
  ],
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "type": "record"
}

Kafka record key for a new insert after Oracle CDC Connector is running

CUSTOMERS

Kafka record key for a new insert after Oracle CDC Connector is running

{
  "SCN": {
    "long": 584627
  },
  "START_SCN": {
    "long": 584627
  },
  "COMMIT_SCN": {
    "long": 584628
  },
  "TIMESTAMP": {
    "long": 1634860133000
  },
  "START_TIMESTAMP": {
    "long": 1634860133000
  },
  "COMMIT_TIMESTAMP": {
    "long": 1634860133000
  },
  "XIDUSN": {
    "long": 10
  },
  "XIDSLT": {
    "long": 12
  },
  "XIDSQN": {
    "long": 434
  },
  "XID": {
    "bytes": "\n\u0000\f\u0000²\u0001\u0000\u0000"
  },
  "PXIDUSN": {
    "long": 10
  },
  "PXIDSLT": {
    "long": 12
  },
  "PXIDSQN": {
    "long": 434
  },
  "PXID": {
    "bytes": "\n\u0000\f\u0000²\u0001\u0000\u0000"
  },
  "TX_NAME": null,
  "OPERATION": {
    "string": "INSERT"
  },
  "OPERATION_CODE": {
    "int": 1
  },
  "ROLLBACK": {
    "boolean": false
  },
  "SEG_OWNER": {
    "string": "ADMIN"
  },
  "SEG_NAME": {
    "string": "CUSTOMERS"
  },
  "TABLE_NAME": {
    "string": "CUSTOMERS"
  },
  "SEG_TYPE": {
    "int": 2
  },
  "SEG_TYPE_NAME": {
    "string": "TABLE"
  },
  "TABLE_SPACE": {
    "string": "USERS"
  },
  "ROW_ID": {
    "string": "AAAFhYAAAAAAAMcAAE"
  },
  "USERNAME": {
    "string": "ADMIN"
  },
  "OS_USERNAME": {
    "string": "UNKNOWN"
  },
  "MACHINE_NAME": {
    "string": "UNKNOWN"
  },
  "AUDIT_SESSIONID": {
    "long": 50049
  },
  "SESSION_NUM": {
    "long": 1269
  },
  "SERIAL_NUM": {
    "long": 35981
  },
  "SESSION_INFO": {
    "string": "UNKNOWN"
  },
  "THREAD_NUM": {
    "long": 1
  },
  "SEQUENCE_NUM": {
    "long": 4
  },
  "RBASQN": {
    "long": 18
  },
  "RBABLK": {
    "long": 311
  },
  "RBABYTE": {
    "long": 16
  },
  "UBAFIL": {
    "long": 3
  },
  "UBABLK": {
    "long": 19960
  },
  "UBAREC": {
    "long": 8
  },
  "UBASQN": {
    "long": 89
  },
  "ABS_FILE_NUM": {
    "long": 4
  },
  "REL_FILE_NUM": {
    "long": 0
  },
  "DATA_BLK_NUM": {
    "long": 796
  },
  "DATA_OBJ_NUM": {
    "long": 22616
  },
  "DATA_OBJV_NUM": {
    "long": 1
  },
  "DATA_OBJD_NUM": {
    "long": 22616
  },
  "SQL_REDO": {
    "string": "insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('5','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-10-21 09:48:53.061'));"
  },
  "SQL_UNDO": {
    "string": "delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '5' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-10-21 09:48:53.061') and ROWID = 'AAAFhYAAAAAAAMcAAE';"
  },
  "RS_ID": {
    "string": " 0x000012.00000137.0010 "
  },
  "SSN": {
    "long": 4
  },
  "CSF": {
    "boolean": false
  },
  "INFO": null,
  "STATUS": {
    "int": 0
  },
  "REDO_VALUE": {
    "long": 3616
  },
  "UNDO_VALUE": {
    "long": 3617
  },
  "SAFE_RESUME_SCN": {
    "long": 0
  },
  "CSCN": {
    "long": 584628
  },
  "OBJECT_ID": null,
  "EDITION_NAME": null,
  "CLIENT_ID": {
    "string": "UNKNOWN"
  },
  "SRC_CON_NAME": {
    "string": "ORCL"
  },
  "SRC_CON_ID": {
    "long": 0
  },
  "SRC_CON_UID": {
    "long": 0
  },
  "SRC_CON_DBID": {
    "long": 0
  },
  "SRC_CON_GUID": null,
  "CON_ID": {
    "boolean": false
  }
}

Numeric mapping examples

This section includes numeric.mapping examples for the Oracle CDC connector.

SMT Examples

This section includes Single Message Transform (SMT) examples for the Oracle CDC Connector.

TimestampConverter SMT

Suppose you have the following table:

CREATE TABLE CUSTOMERS (
  ID NUMBER(10) NOT NULL PRIMARY KEY,
  FIRST_NAME VARCHAR(50),
  CLUB_STATUS VARCHAR(8),
  COMMENTS VARCHAR(90),
  CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

If you run a connector, you may see a record similar to the following:

{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"long":1636048731469},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"589654"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636049137001"},"row_id":null,"username":null}

The timestamp is represented in "CREATE_TS":{"long":1636048731469}, but sometimes you want to keep the timestamp that is compatible with java.text.SimpleDateFormat, so you can use TimestampConverter SMT as shown in the following example:

"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"transforms.TimestampConverter.field": "CREATE_TS",
"transforms.TimestampConverter.target.type": "string"

TimestampConverter SMT converts "CREATE_TS":{"long":1636048731469} to "CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"}:

{"ID":1,"FIRST_NAME":{"string":"Rica"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"string":"2021-11-04T17:58:51.469+0000"},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"594158"},"op_type":{"string":"R"},"op_ts":null,"current_ts":{"string":"1636050925327"},"row_id":null,"username":null}

InsertField SMT

Suppose you want to insert a field to specify the origin of records and add the InsertField configuration property to the connector configuration as shown in the following example:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database"

After the connector is up and running, if changes (for example, INSERT, UPDATE, or DELETE) happen in tables you are watching , you will see "Origin":{"string":"Oracle Database"} is added to a table-specific topic as shown in the following example:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636058280541},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"611987"},"op_type":{"string":"I"},"op_ts":{"string":"1636083480000"},"current_ts":{"string":"1636058282236"},"row_id":{"string":"AAAFhxAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}

If you look at the redo log topic, you will also see "Origin":{"string":"Oracle Database"} being added, and this is probably not what you want:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":611987},"START_SCN":{"long":611987},"COMMIT_SCN":{"long":611988},"TIMESTAMP":{"long":1636083480000},"START_TIMESTAMP":{"long":1636083480000},"COMMIT_TIMESTAMP":{"long":1636083480000},"XIDUSN":{"long":3},"XIDSLT":{"long":33},"XIDSQN":{"long":407},"XID":{"bytes":"\u0003\u0000!\u0000—\u0001\u0000\u0000"},"PXIDUSN":{"long":3},"PXIDSLT":{"long":33},"PXIDSQN":{"long":407},"PXID":{"bytes":"\u0003\u0000!\u0000—\u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhxAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":55},"RBABLK":{"long":13},"RBABYTE":{"long":208},"UBAFIL":{"long":3},"UBABLK":{"long":169},"UBAREC":{"long":19},"UBASQN":{"long":74},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22641},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22641},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 13:38:00.541'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 13:38:00.541') and ROWID = 'AAAFhxAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x000037.0000000d.00d0 "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":22},"UNDO_VALUE":{"long":23},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":611988},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false},"Origin":{"string":"Oracle Database"}}

You can leverage KIP-585: Filter and Conditional SMTs to apply SMTs to certain topics only.

"transforms": "InsertField",
"transforms.InsertField.predicate":"has-my-prefix",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "Origin",
"transforms.InsertField.static.value": "Oracle Database",
"predicates":"has-my-prefix",
"predicates.has-my-prefix.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.has-my-prefix.pattern":"ORCL.ADMIN.*"

After the connector is up and running, if changes happen to tables of your interest (for example, INSERT, UPDATE, or DELETE) , you will see "Origin":{"string":"Oracle Database"} being added to a table-specific topic:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ORCL.ADMIN.CUSTOMERS --from-beginning
...
{"ID":15,"FIRST_NAME":{"string":"Hansiain"},"CLUB_STATUS":{"string":"platinum"},"COMMENTS":{"string":"Centralized full-range approach"},"CREATE_TS":{"long":1636060255772},"table":{"string":"ORCL.ADMIN.CUSTOMERS"},"scn":{"string":"617372"},"op_type":{"string":"I"},"op_ts":{"string":"1636085455000"},"current_ts":{"string":"1636060258267"},"row_id":{"string":"AAAFhzAAAAAAAMMAAO"},"username":{"string":"ADMIN"},"Origin":{"string":"Oracle Database"}}

It is important to note that "Origin":{"string":"Oracle Database"} won’t show up in the redo log topic:

kafka-avro-console-consumer -bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic redo-log-topic --from-beginning
...
{"SCN":{"long":617372},"START_SCN":{"long":617371},"COMMIT_SCN":{"long":617373},"TIMESTAMP":{"long":1636085455000},"START_TIMESTAMP":{"long":1636085455000},"COMMIT_TIMESTAMP":{"long":1636085455000},"XIDUSN":{"long":9},"XIDSLT":{"long":31},"XIDSQN":{"long":416},"XID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"PXIDUSN":{"long":9},"PXIDSLT":{"long":31},"PXIDSQN":{"long":416},"PXID":{"bytes":"\t\u0000\u001F\u0000 \u0001\u0000\u0000"},"TX_NAME":null,"OPERATION":{"string":"INSERT"},"OPERATION_CODE":{"int":1},"ROLLBACK":{"boolean":false},"SEG_OWNER":{"string":"ADMIN"},"SEG_NAME":{"string":"CUSTOMERS"},"TABLE_NAME":{"string":"CUSTOMERS"},"SEG_TYPE":{"int":2},"SEG_TYPE_NAME":{"string":"TABLE"},"TABLE_SPACE":{"string":"USERS"},"ROW_ID":{"string":"AAAFhzAAAAAAAMMAAO"},"USERNAME":{"string":"ADMIN"},"OS_USERNAME":{"string":"UNKNOWN"},"MACHINE_NAME":{"string":"UNKNOWN"},"AUDIT_SESSIONID":{"long":50052},"SESSION_NUM":{"long":1259},"SERIAL_NUM":{"long":58603},"SESSION_INFO":{"string":"UNKNOWN"},"THREAD_NUM":{"long":1},"SEQUENCE_NUM":{"long":6},"RBASQN":{"long":61},"RBABLK":{"long":231},"RBABYTE":{"long":140},"UBAFIL":{"long":3},"UBABLK":{"long":2952},"UBAREC":{"long":12},"UBASQN":{"long":70},"ABS_FILE_NUM":{"long":4},"REL_FILE_NUM":{"long":0},"DATA_BLK_NUM":{"long":780},"DATA_OBJ_NUM":{"long":22643},"DATA_OBJV_NUM":{"long":1},"DATA_OBJD_NUM":{"long":22643},"SQL_REDO":{"string":"insert into \"ADMIN\".\"CUSTOMERS\"(\"ID\",\"FIRST_NAME\",\"CLUB_STATUS\",\"COMMENTS\",\"CREATE_TS\") values ('15','Hansiain','platinum','Centralized full-range approach',TO_TIMESTAMP('2021-11-04 14:10:55.772'));"},"SQL_UNDO":{"string":"delete from \"ADMIN\".\"CUSTOMERS\" where \"ID\" = '15' and \"FIRST_NAME\" = 'Hansiain' and \"CLUB_STATUS\" = 'platinum' and \"COMMENTS\" = 'Centralized full-range approach' and \"CREATE_TS\" = TO_TIMESTAMP('2021-11-04 14:10:55.772') and ROWID = 'AAAFhzAAAAAAAMMAAO';"},"RS_ID":{"string":" 0x00003d.000000e7.008c "},"SSN":{"long":14},"CSF":{"boolean":false},"INFO":null,"STATUS":{"int":0},"REDO_VALUE":{"long":16},"UNDO_VALUE":{"long":17},"SAFE_RESUME_SCN":{"long":0},"CSCN":{"long":617373},"OBJECT_ID":null,"EDITION_NAME":null,"CLIENT_ID":{"string":"UNKNOWN"},"SRC_CON_NAME":{"string":"ORCL"},"SRC_CON_ID":{"long":0},"SRC_CON_UID":{"long":0},"SRC_CON_DBID":{"long":0},"SRC_CON_GUID":null,"CON_ID":{"boolean":false}}

PostgreSQL Example

This section includes an example of how to move records from Oracle Database to PostgreSQL using the Oracle CDC Source and the JDBC Sink connectors.

  1. Create an Oracle CDC Source connector.

    The following configuration will create a snapshot and store new changes (inserts) to a table-specific topic called ORCLCDB.C__MYUSER.USERS

      {
          "name": "SimpleOracleCDC_DEMO",
          "config":{
            "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
            "name": "SimpleOracleCDC_DEMO",
            "tasks.max":3,
            "key.converter":"org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://localhost:8081",
            "confluent.topic.bootstrap.servers":"localhost:9092",
            "oracle.server": "localhost",
            "oracle.port": 1521,
            "oracle.sid":"ORCLCDB",
            "oracle.username": "C##MYUSER",
            "oracle.password": "mypassword",
            "start.from":"snapshot",
            "redo.log.topic.name": "redo-log-topic",
            "redo.log.consumer.bootstrap.servers":"localhost:9092",
            "table.inclusion.regex": ".*USERS.*",
            "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
            "connection.pool.max.size": 20,
            "confluent.topic.replication.factor":1,
            "lob.topic.name.template":"${databaseName}.${schemaName}.${tableName}.${columnName}",
            "redo.log.row.fetch.size":1,
            "numeric.mapping": "best_fit"
      }
    }
    

    A sample record in ORCLCDB.C__MYUSER.USERS:

    {"ID":241,"FIRST_NAME":{"string":"Lettie"},"LAST_NAME":{"string":"Kaplan"},"EMAIL":{"string":"Lettie.Kaplan@utvel.us"},"GENDER":{"string":"male"},"CLUB_STATUS":{"string":"active"},"COMMENTS":{"string":"Confluent"},"UPDATE_TS":{"long":1623831883974},"table":{"string":"ORCLCDB.C##MYUSER.USERS"},"scn":{"string":"1450183"},"op_type":{"string":"I"},"op_ts":{"string":"1623857084000"},"current_ts":{"string":"1623831886610"},"row_id":{"string":"AAAR9JAAHAAAACFAAg"},"username":{"string":"C##MYUSER"}}
    
  2. Create a JDBC Sink connector.

    You can use a Single Message Transform (SMT) to drop prefix, ORCLCDB.C__MYUSER., enabling the connector to UPSERT records to a USERS table in PostgreSQL as shown in the following example:

    {
      "name": "jdbc_sink_postgres_demo",
      "config": {
              "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
              "connection.url": "jdbc:postgresql://<postgres-url>:5432/<database-name>",
              "connection.user": "<user>",
              "connection.password": "<password>",
              "tasks.max": "2",
              "topics": "ORCLCDB.C__MYUSER.USERS",
              "auto.create": "true",
              "auto.evolve": "true",
              "dialect.name": "PostgreSqlDatabaseDialect",
              "insert.mode": "upsert",
              "pk.mode": "record_value",
              "pk.fields":"ID",
              "batch.size": 1,
              "key.converter":"org.apache.kafka.connect.storage.StringConverter",
              "value.converter": "io.confluent.connect.avro.AvroConverter",
              "value.converter.schema.registry.url": "http://localhost:8081",
              "transforms":"dropPrefix",
              "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
              "transforms.dropPrefix.regex":"ORCLCDB.C__MYUSER.(.*)",
              "transforms.dropPrefix.replacement":"$1",
              "errors.tolerance":"all",
              "errors.deadletterqueue.topic.name":"dlq-jdbc-sink",
              "errors.deadletterqueue.context.headers.enable": "true",
              "errors.deadletterqueue.topic.replication.factor":"1"
          }
    }
    
  3. Check PostgreSQL and verify the data looks simliar to following:

    Oracle CDC connector PostgreSQL example