Oracle CDC Source Connector for Confluent Cloud

Oracle 11g, 12c and 18c Deprecation

Oracle discontinued support for the following Oracle Database versions:

  • Version 11g on December 31, 2020
  • Version 12c on March 31, 2022
  • Version 18c on June 30, 2021

Oracle CDC connector support for each of these versions will reach end-of-life on June 30, 2025. Confluent currently supports Oracle Database versions 19c and later.

The fully-managed Oracle CDC Source connector for Confluent Cloud captures each change to rows in a database and then represents the changes as change event records in Apache Kafka® topics. The connector uses Oracle LogMiner to read the database redo log. The connector requires a database user with permissions to use LogMiner and permissions to select from all of the tables captured by the connector.

Note

This is a Quick Start for the fully-managed cloud connector. If you are installing the connector locally for Confluent Platform, see Oracle CDC Source for Confluent Platform.

The connector can be configured to capture a subset of the tables in a single database, defined as all tables accessible by the user that match an include regular expression. It can also be configured to not capture tables that match a separate exclude regular expression.

The connector writes the changes from each of the tables to Kafka topics, where the table-to-topic mapping is determined by the table.topic.name.template connector configuration property. This property defaults to the dot-delimited fully-qualified name of the table (for example, database_name.schema_name.table_name).

The connector recognizes literals and several variables (for example, ${tableName} and ${schemaName}) to customize table-to-topic mapping. Variables are resolved at runtime. For example, the following configuration property results in changes to the ORCL.ADMIN.USERS table to be written to the Kafka topic named my-prefix.ORCL.ADMIN.USERS.

table.topic.name.template=my-prefix.${databaseName}.${schemaName}.${tableName}

For a list of template variables, see Template variables.

The connector is designed to write all of the raw Oracle redo log records to one Kafka topic logically referred to as the “redo log topic”. The redo.log.topic.name configuration property determines the name of this topic. The connector actually consumes this topic to identify and produce all of the table-specific events written to the table-specific topics. The connector can be configured by setting the table.topic.name.template property to an empty string to only write to the redo log topic without generating table-specific events to the table-specific topics.

Limitations

Be sure to review the following information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Oracle CDC Source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in an Oracle database and then monitoring and recording all subsequent row-level changes.

Important

Prerequisites
  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

See the Quick Start for Confluent Cloud for installation instructions.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the Oracle Database Source connector card.

Oracle CDC Source Connector Card

Step 4: Enter the connector details

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

At the Add Oracle CDC Source Premium Connector screen, complete the following:

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:
    • My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
    • Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
    • Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
  2. Click Continue.

Important

Step 5: Check the Kafka topic

After the connector is running, verify that records are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../../_images/topology.png

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

  • Make sure you have all your prerequisites completed.
  • The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

For example:

confluent connect plugin describe OracleCdcSource

Example output:

The following are required configs:
connector.class : OracleCdcSource
name
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
oracle.server
oracle.sid
oracle.username
table.inclusion.regex

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "name": "OracleCdcSourceConnector_0",
  "config": {
    "connector.class": "OracleCdcSource",
    "name": "OracleCdcSourceConnector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "****************",
    "kafka.api.secret": "**************************************************",
    "oracle.server": "database-5.abcdefg12345.us-west-2.rds.amazonaws.com",
    "oracle.port": "1521",
    "oracle.sid": "ORCL",
    "oracle.username": "admin",
    "oracle.password": "**********",
    "table.inclusion.regex": "ORCL[.]ADMIN[.]CUSTOMERS",
    "start.from": "SNAPSHOT",
    "query.timeout.ms": "60000",
    "redo.log.row.fetch.size": "1",
    "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
    "lob.topic.name.template":"${databaseName}.${schemaName}.${tableName}.${columnName}",
    "numeric.mapping": "BEST_FIT_OR_DOUBLE",
    "output.data.key.format": "JSON_SR",
    "output.data.value.format": "JSON_SR",
    "tasks.max": "2"
  }
}

Note the following property definitions:

  • "name": Sets a name for your new connector.
  • "connector.class": Identifies the connector plugin name.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "table.inclusion.regex": The regular expression that matches the fully-qualified names of the tables including SID or PDB and schema. For example, if you use a non-container database, in addition to the example shown in the configuration above, an example expression could be ORCLCDB[.]C##MYUSER[.](table1|orders|customers) or ORCLCDB.C##MYUSER.(table1|orders|customers). If you use a multi-tenant database, the example expression could be ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) or ORCLPDB1.C##MYUSER.(table1|orders|customers).

  • "start.from": What the connector should do when it starts for the first time. The value is either the literal snapshot (the default), the literal current, the literal force_current, an Oracle System Change Number (SCN), or a database timestamp in the form DD-MON-YYYY HH24:MI:SS. The snapshot literal instructs the connector to snapshot captured tables the first time it is started, then continue processing redo log events from the point in time when the snapshot was taken. The current literal instructs the connector to start from the current Oracle SCN without snapshotting. The force_current literal is the same as current, but ignores any previously stored offsets when the connector is restarted. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs.

  • "query.timeout.ms": The timeout in milliseconds for any query submitted to Oracle. The default is five minutes (or 300000 milliseconds). If set to a negative value, the connector does not enforce timeout on queries.

  • "redo.log.row.fetch.size": The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. The default is 5000 rows. Use 0 to disable this hint.

  • "table.topic.name.template": The template that defines the name of the Kafka topic where the connector writes a change event. The value can be a constant if the connector writes all records from all captured tables to one topic. Or, the value can include any supported template variables, including ${databaseName}, ${schemaName}, ${tableName}, ${connectorName}, and so on. The default is an empty string, which indicates that the connector is not producing change event records. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable. You can leave this property empty only when a redo log topic is specified. If left empty, the connector only writes to the redo log topic.

  • "numeric.mapping": Map NUMERIC values by precision and optionally scale to primitive or decimal types. The none option is the default, but may lead to serialization issues since the Connect DECIMAL type is mapped to its binary representation. One of the best_fit_or... options are generally preferred. The following options can be used:

    • Use none if you want all NUMERIC columns to be represented by the Connect DECIMAL logical type.
    • Use best_fit_or_decimal if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect DECIMAL logical type is used instead.
    • Use best_fit_or_double if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect FLOAT64 type is used instead.
    • Use best_fit_or_string if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect STRING type is used instead.
    • Use precision_only to map NUMERIC columns based only on the column’s precision, assuming the column’s scale is 0.
    • For backward compatibility, the best_fit option is also available. It behaves the same as best_fit_or_decimal.
  • "output.data.key.format": Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • "output.data.value.format": Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • "tasks.max": Sets the maximum number of tasks in use by the connector. More tasks may improve performance. Defaults to 2 tasks.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs. For a few example SMTs, see SMT Examples for Oracle CDC Source Connector for Confluent Cloud.

See Configuration Properties for all properties and definitions.

Step 4: Load the properties file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file oracle-cdc-source.json

Example output:

Created connector OracleCdcSource_0 lcc-ix4dl

Step 5: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID          |            Name       | Status  |  Type
+-----------+-----------------------+---------+-------+
lcc-ix4dl   | OracleCdcSource_0     | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

Important

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

Configuration Properties

Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string
  • Default: default
  • Importance: medium

How should we connect to your database?

oracle.server

The hostname or address for the Oracle server.

  • Type: string
  • Valid Values: Must match the regex ^[^\?=%&\(\)]*$
  • Importance: high
oracle.port

The port number used to connect to Oracle.

  • Type: int
  • Default: 1521
  • Valid Values: [1,…,65535]
  • Importance: high
oracle.sid

The Oracle system identifier (SID) of a multi-tenant container database (CDB) or non-multitenant database where tables reside. Confluent recommends you use oracle.service.name to connect to the database using service names instead of using the SID. Maps to the SID parameter in the connect descriptor.

  • Type: string
  • Valid Values: Must match the regex ^[a-zA-Z][a-zA-Z0-9$#_]*$
  • Importance: high
oracle.pdb.name

The name of the pluggable database (PDB). This is not required when tables reside in the CDB$ROOT database, or if you’re using a non-container database.

  • Type: string
  • Valid Values: Must match the regex ^([a-zA-Z][a-zA-Z0-9$#_]*)*$
  • Importance: high
oracle.service.name

The Oracle service name. If set, the connector always connects to the database using the provided service name. The oracle.service.name maps to the SERVICE_NAME parameter in the connect descriptor. For the multitenant container database (CDB) or non-multitenant database, this does not need to be specified. Confluent recommends you set the oracle.service.name to the container database (CDB) service name when using a pluggable database (PDB). When this property is set, it is used in the connect descriptor instead of oracle.sid.

  • Type: string
  • Valid Values: Must match the regex ^([a-zA-Z][a-zA-Z0-9$#_.]*)*$
  • Importance: low
oracle.username

The name of the Oracle database user.

  • Type: string
  • Valid Values: Must match the regex ^[^\?=%&\(\)]*$
  • Importance: high
oracle.password

The password for the Oracle database user.

  • Type: password
  • Importance: high
ssl.truststorefile

The trust store containing server CA certificate. Only required when using SSL to connect to the database.

  • Type: password
  • Default: [hidden]
  • Importance: low
ssl.truststorepassword

The trust store password containing server CA certificate. Only required when using SSL to connect to the database.

  • Type: password
  • Default: [hidden]
  • Importance: low
oracle.fan.events.enable

Whether the connection should allow using Oracle RAC Fast Application Notification (FAN) events. This is disabled by default, meaning FAN events will not be used even if they are supported by the database. This should only be enabled when using Oracle RAC set up with FAN events. Enabling this feature may cause connection issues when the database is not set up to use FAN events.

  • Type: boolean
  • Default: false
  • Importance: low

Database details

table.inclusion.regex

The regular expression that matches the fully-qualified table names. The values are matched (case sensitive) with the object names stored in the data dictionary (Uppercase unless created as a quoted identifier. Database and PDB names are always stored as uppercase in the data dictionary). Ensure consistent casing for the sid part in the identifier with the oracle.sid value specified. For non-container database, the fully-qualified name includes the SID and schema name. e.g. ORCLDB[.]MYUSER[.](ORDERS|CUSTOMERS) or ORCLDB\.MYUSER\.(ORDERS|CUSTOMERS). For multitenant database (CDB), the fully-qualified name includes the SID and schema name. e.g. ORCLCDB[.]C##MYUSER[.](ORDERS|CUSTOMERS) or ORCLCDB\.C##MYUSER\.(ORDERS|CUSTOMERS). For multitenant database (PDB), the fully-qualified name includes the PDB and schema name. e.g. ORCLPDB1[.]C##MYUSER[.](ORDERS|CUSTOMERS) or ORCLPDB1\.C##MYUSER\.(ORDERS|CUSTOMERS).

  • Type: string
  • Importance: high
table.exclusion.regex

The regular expression that matches the fully-qualified table names. The values are matched (case sensitive) with the object names stored in the data dictionary (Uppercase unless created as a quoted identifier. Database and PDB names are always stored as uppercase in the data dictionary). Ensure consistent casing for the sid part in the identifier with the oracle.sid value specified. For non-container database, the fully-qualified name includes the SID and schema name. e.g. ORCLDB[.]MYUSER[.](ORDERS|CUSTOMERS) or ORCLDB\.MYUSER\.(ORDERS|CUSTOMERS). For multitenant database (CDB), the fully-qualified name includes the SID and schema name. e.g. ORCLCDB[.]C##MYUSER[.](ORDERS|CUSTOMERS) or ORCLCDB\.C##MYUSER\.(ORDERS|CUSTOMERS). For multitenant database (PDB), the fully-qualified name includes the PDB and schema name. e.g. ORCLPDB1[.]C##MYUSER[.](ORDERS|CUSTOMERS) or ORCLPDB1\.C##MYUSER\.(ORDERS|CUSTOMERS).

  • Type: string
  • Default: “”
  • Importance: high
oracle.supplemental.log.level

Database supplemental logging level for connector operation. If set to full, the connector validates that the supplemental logging level on the database is FULL and then captures Snapshots and CDC events for the specified tables whenever table.topic.name.template is not set to "". When the level is set to msl, the connector does not capture the CDC change events; rather, it only captures snapshots if table.topic.name.template is not set to "". Note that this setting is irrelevant if the table.topic.name.template is set to "". In this case, only redo logs are captured. This setting defaults to full supplemental logging level mode.

  • Type: string
  • Default: full
  • Valid Values: full, msl
  • Importance: medium
start.from

When starting for the first time, this is the position in the redo log where the connector should start. Specifies an Oracle System Change Number (SCN) or a database timestamp with the format yyyy-MM-dd HH:mm:SS in the database time zone. Defaults to the literal snapshot, which instructs the connector to perform an initial snapshot of each captured table before capturing changes. The literal current may instruct the connector to start from the current Oracle SCN without snapshotting. The force_current literal is the same as current, but it will ignore any previously stored offsets when the connector is restarted. This option should be used cautiously as it can result in losing changes between the SCN stored in the offsets and the current SCN. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs. Every option other than force_current causes the connector to resume from the stored offsets in case of task restarts or re-balances.

  • Type: string
  • Default: snapshot
  • Importance: medium

Connector details

emit.tombstone.on.delete

If true, delete operations emit a tombstone record with null value.

  • Type: boolean
  • Default: false
  • Importance: low
behavior.on.dictionary.mismatch

Specifies the desired behavior when the connector is not able to parse the value of a column due to a dictionary mismatch caused by DDL statement. This can happen if the online dictionary mode is specified but the connector is streaming historical data recorded before DDL changes occurred. The default option fail will cause the connector task to fail. The log option will log the unparsable record and skip the problematic record without failing the connector task.

  • Type: string
  • Default: fail
  • Valid Values: fail, log
  • Importance: low
behavior.on.unparsable.statement

Specifies the desired behavior when the connector encounters a SQL statement that could not be parsed. The default option fail will cause the connector task to fail. The log option will log the unparsable statement and skip the problematic record without failing the connector task.

  • Type: string
  • Default: fail
  • Valid Values: fail, log
  • Importance: low
db.timezone

Default timezone to assume when parsing Oracle DATE and TIMESTAMP types for which timezone info is not available. For example, if db.timezone=UTC, data for both DATE and TIMESTAMP is parsed as if in UTC timezone. The value has to be a valid java.util.TimeZone ID.

  • Type: string
  • Default: UTC
  • Importance: low
db.timezone.date

The default timezone to assume when parsing Oracle DATE type for which timezone information is not available. If db.timezone.date is set, the value of db.timezone for DATE type will be overwritten with the value in db.timezone.date. For example, if db.timezone=UTC and db.timezone.date=America/Los_Angeles, the data TIMESTAMP will be parsed as if it is in UTC timezone, and the data in DATE will be parsed as if in America/Los_Angeles timezone. The value has to be a valid java.util.TimeZone ID.

  • Type: string
  • Importance: low
redo.log.startup.polling.limit.ms

The amount of time to wait for the redo log to be present on connector startup. This is only relevant when connector is configured to capture change events. On expiration of this wait time, the connector will move to a failed state.

  • Type: long
  • Default: 300000 (5 minutes)
  • Valid Values: [0,…,3600000]
  • Importance: low
heartbeat.interval.ms

The interval in milliseconds after which the connector would emit heartbeat records to heartbeat topic with the name ${connectorName}-${databaseName}-heartbeat-topic. Heartbeats are useful for moving the connector offsets and ensuring we are always up to the latest SCN we processed. The default is 0 milliseconds which disables the heartbeat mechanism. Confluent recommends that you set the heartbeat.interval.ms parameter to a value in the order of minutes to hours in environments where the connector is configured to capture infrequently updated tables so the source offsets can move forward. Otherwise, a task restart could cause the connector to fail with an ORA-01291 missing logfile error if the archived redo log file corresponding to the stored source offset has been purged from the database.

  • Type: long
  • Default: 0
  • Valid Values: [0,…]
  • Importance: low
log.mining.end.scn.deviation.ms

Calculates the end SCN of log mining sessions as the approximate SCN that corresponds to the point in time that is log.mining.end.scn.deviation.ms milliseconds before the current SCN obtained from the database. The default value is set to 3 seconds on RAC environments, and 0 seconds on non RAC environments. This configuration is applicable only for Oracle database versions 19c and later. Setting this configuration to a lower value on a RAC environment introduces the potential for data loss at high load. A higher value increases the end to end latency for change events.

  • Type: long
  • Default: 0
  • Valid Values: [0,…,60000]
  • Importance: medium
log.mining.archive.destination.name

The name of the archive log destination to use when mining archived redo logs. For example, when configured with LOG_ARCHIVE_DEST_2, the connector exclusively refers to the second destination for retrieving archived redo logs. This is only applicable for Oracle database versions 19c and later.

  • Type: string
  • Default: “”
  • Importance: low
use.transaction.begin.for.mining.session

Set start SCN for log mining session to the start SCN of the oldest relevant open transaction if one exists. A relevant transaction is defined as one that has changes to tables that the connector is setup to capture. It is recommended to set this to true when connecting to Oracle Real Application Clusters (RAC) databases or if large object datatypes (LOB) support is enabled (using enable.large.lob.object.support). This configuration is applicable only for Oracle database versions 19c and later.

  • Type: boolean
  • Default: false
  • Importance: medium
log.mining.transaction.age.threshold.ms

Specifies the threshold (in milliseconds) for transaction age. Transaction age is defined as the duration the transaction has been open on the database. If the transaction age exceeds this threshold then an action is taken depending on the value set for the log.mining.transaction.threshold.breached.action configuration. The default value is -1 which means that a transaction is retained in the buffer until the connector receives the commit or rollback event for the transaction. This configuration is applicable only when use.transaction.begin.for.mining.session is set to true.

  • Type: long
  • Default: -1
  • Importance: medium
log.mining.transaction.threshold.breached.action

Specifies the action to take when an active transaction exceeds the threshold defined using the log.mining.transaction.age.threshold.ms configuration. When set to discard, the connector drops long running transactions that exceed the threshold age from the buffer and skip emitting any records associated with these transactions. With warn the connector logs a warning, mentioning the oldest transaction that exceed the threshold.

  • Type: string
  • Default: warn
  • Valid Values: discard, warn
  • Importance: medium

Connection details

query.timeout.ms

The timeout in milliseconds for any query submitted to Oracle. The default is 5 minutes (or 300000 milliseconds). If set to negative values, then the connector will not enforce timeout on queries.

  • Type: long
  • Default: 300000 (5 minutes)
  • Importance: medium
max.batch.size

The maximum number of records that will be returned by the connector to Connect. The connector may still return fewer records if no additional records are available.

  • Type: int
  • Default: 1000
  • Valid Values: [100,…,10000]
  • Importance: medium
poll.linger.ms

The maximum time to wait for a record before returning an empty batch. The call to poll can return early before poll.linger.ms expires if max.batch.size records are received.

  • Type: long
  • Default: 5000 (5 seconds)
  • Valid Values: [0,…,300000]
  • Importance: medium
max.buffer.size

The maximum number of records from all snapshot threads and from the redo log that can be buffered into batches. The default of 0 means a buffer size will be computed from the maximum batch size and number of threads.

  • Type: int
  • Default: 0
  • Valid Values: [0,…,10000]
  • Importance: low
redo.log.poll.interval.ms

The interval between polls to retrieve the database redo log events. This has no effect when using Oracle database versions prior to 19c.

  • Type: long
  • Default: 500
  • Valid Values: [500,…,60000]
  • Importance: medium
snapshot.row.fetch.size

The number of rows to provide as a hint to the JDBC driver when fetching table rows in a snapshot. A value of 0 disables this hint.

  • Type: int
  • Default: 2000
  • Valid Values: [0,…,10000]
  • Importance: medium
redo.log.row.fetch.size

The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. A value of 0 disables this hint. When continuous mine is available (database versions before Oracle 19c), the mining query from the connector waits until the number of rows available from the redo log is at least the value specified for fetch size before returning the results.

  • Type: int
  • Default: 5000
  • Valid Values: [0,…,10000]
  • Importance: medium
redo.log.row.poll.fields.include

A comma-separated list of fields from the V$LOGMNR_CONTENTS view to include in the redo log events.

  • Type: list
  • Default: “”
  • Importance: low
redo.log.row.poll.fields.exclude

A comma-separated list of fields from the V$LOGMNR_CONTENTS view to exclude in the redo log events.

  • Type: list
  • Default: “”
  • Importance: low
redo.log.row.poll.username.exclude

A comma-separated list of database usernames. When this property is set, the connector captures changes only from database users that are not specified in this list. You cannot set this property along with the redo.log.row.poll.username.include property

  • Type: list
  • Default: “”
  • Importance: low
redo.log.row.poll.username.include

A comma-separated list of database usernames. When this property is set, the connector captures changes only from the specified set of database users. You cannot set this property along with the redo.log.row.poll.username.exclude property

  • Type: list
  • Default: “”
  • Importance: low
oracle.validation.result.fetch.size

The fetch size to be used while querying database for validations. This will be used to query list of tables and supplemental logging level validation.

  • Type: int
  • Default: 5000
  • Importance: low

Output messages

table.topic.name.template

The template that defines the name of the Kafka topic where the change event is written. The value can be a constant if the connector writes all change events from all captured tables to one topic. The value can include any supported template variables, including ${databaseName}, ${schemaName}, ${tableName}, ${connectorName} This can be left blank only if the connector has to write events only to the redo log topic and not to the table change event topics. Special characters, including \, $, {, and } must be escaped with \ when not intended to be part of a template variable.

  • Type: string
  • Default: ${databaseName}.${schemaName}.${tableName}
  • Importance: high
redo.log.corruption.topic

The name of the Kafka topic to which the connector will record events that describe the information about corruption in the Oracle redo log, and which signify missed data. This can optionally use the template variables ${connectorName}, ${databaseName}, and ${schemaName}. A blank topic name (the default) signals that this information should not be written to Kafka.

  • Type: string
  • Default: “”
  • Importance: high
redo.log.topic.name

The template for the name of the Kafka topic to which the connector will record all raw redo log events. This can optionally use the template variables ${connectorName}, ${databaseName}, and ${schemaName}.

  • Type: string
  • Default: ${connectorName}-${databaseName}-redo-log
  • Importance: high
key.template

The template that defines the Kafka record key for each change event. By default, the record key contains a concatenated primary key value delimited by an underscore (_) character. Use ${primaryKeyStructOrValue} to contain either the sole column value of a single-column primary key or a STRUCT with the multi-column primary key fields (or null if the table has no primary or unique key). Use ${primaryKeyStruct} to always use a STRUCT for primary keys that have one or more columns (or null if there is no primary or unique key). If the template contains variables or string literals, the record key is the string with the variables resolved and replaced.

  • Type: string
  • Default: ${primaryKeyStructOrValue}
  • Importance: medium
oracle.dictionary.mode

The dictionary handling mode used by the connector. Options are auto, online, or redo_log. auto: The connector uses the dictionary from the online catalog until a DDL statement to evolve the table schema is encountered. At this point, the connector starts using the dictionary from archived redo logs. Once the DDL statement has been processed, the connector reverts to using the online catalog. Use this mode if DDL statements are expected. online: The connector always uses the online dictionary catalog. Use this mode if no DDL statements are expected. redo_log: The connector always uses the dictionary catalog from archived redo logs. Use this mode if you cannot access the online redo log. Note that any CDC events will be delayed until they are archived from online logs before the connector processes them.

  • Type: string
  • Default: auto
  • Valid Values: auto, online, redo_log
  • Importance: low
output.table.name.field

The name of the field in the change record written to Kafka that contains the fully-qualified name of the affected Oracle table. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the fully-qualified name of the affected Oracle table as a header with the given name.

  • Type: string
  • Default: table
  • Importance: low
output.commit.scn.field

The name of the field in the change record written to Kafka that contains the Oracle system change number (SCN) when the transaction was committed. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the Oracle system change number (SCN) when the transaction committed as a header with the given name.

  • Type: string
  • Default: “”
  • Importance: low
output.scn.field

The name of the field in the change record written to Kafka that contains the Oracle system change number (SCN) when this change was made. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the Oracle system change number (SCN) as a header with the given name.

  • Type: string
  • Default: scn
  • Importance: low
output.before.state.field

The name of the field in the change record written to Kafka that contains the before state of changed database rows for an update operation. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: “”
  • Importance: low
output.op.type.field

The name of the field in the change record written to Kafka that contains the operation type for this change event. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the operation type as a header with the given name.

  • Type: string
  • Default: op_type
  • Importance: low
output.op.ts.field

The name of the field in the change record written to Kafka that contains the operation timestamp for this change event. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the operation timestamp as a header with the given name.

  • Type: string
  • Default: op_ts
  • Importance: low
output.current.ts.field

The name of the field in the change record written to Kafka that contains the connector’s timestamp when this change event was processed. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the connector’s timestamp when this change event was processed as a header with the given name.

  • Type: string
  • Default: current_ts
  • Importance: low
output.row.id.field

The name of the field in the change record written to Kafka that contains the row ID of the table changed by this event. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the row ID of the table changed by this event as a header with the given name.

  • Type: string
  • Default: row_id
  • Importance: low
output.username.field

The name of the field in the change record written to Kafka that contains the name of the Oracle user that executed the transaction that resulted in this change. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the name of the Oracle user that executed the transaction that resulted in this change as a header with the given name.

  • Type: string
  • Default: username
  • Importance: low
output.redo.field

The name of the field in the change record written to Kafka that contains the original redo DML statement from which this change record was created. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the original redo DML statement from which this change record was created as a header with the given name.

  • Type: string
  • Default: “”
  • Importance: low
output.undo.field

The name of the field in the change record written to Kafka that contains the original undo DML statement that effectively undoes this change and represents the “before” state of the row. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the original undo DML statement that effectively undoes this change and represents the “before” state of the row as a header with the given name.

  • Type: string
  • Default: “”
  • Importance: low
output.op.type.read.value

The value of the operation type for a read (snapshot) change event. By default this is “R”.

  • Type: string
  • Default: R
  • Importance: low
output.op.type.insert.value

The value of the operation type for an insert change event. By default this is “I”.

  • Type: string
  • Default: I
  • Importance: low
output.op.type.update.value

The value of the operation type for an update change event. By default this is “U”.

  • Type: string
  • Default: U
  • Importance: low
output.op.type.delete.value

The value of the operation type for a delete change event. By default this is “D”.

  • Type: string
  • Default: D
  • Importance: low
lob.topic.name.template

The template that defines the name of the Kafka topic to which LOB objects should be written. The value can be a constant if all LOB objects from all captured tables are to be written to one topic, or the value can include any supported template variables, including ${columnName}, ${databaseName}, ${schemaName}, ${tableName}, ${connectorName}, etc. The default is empty, which will ignore all LOB type columns if any exist on captured tables. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable. Any character that is not a valid character for topic name is replaced by an underscore in the topic name.

  • Type: string
  • Default: “”
  • Importance: low
snapshot.by.table.partitions

Whether the connector should perform snapshots on each table partition if the table is defined to use partitions. This is false by default, meaning that one snapshot is performed on each table in its entirety.

  • Type: boolean
  • Default: false
  • Importance: low
snapshot.threads.per.task

The number of threads that can be used in each task to perform snapshots. This is only useful for a task if the value of the number of tables assigned to that task is more than this.

  • Type: int
  • Default: 4
  • Valid Values: [1,…,10]
  • Importance: low
enable.large.lob.object.support

If true, the connector will support large LOB objects that are split across multiple redo log records. The connector will emit commit messages to the redo log topic and use these commit messages to track when a large LOB object can be emitted to the LOB topic.

  • Type: boolean
  • Default: false
  • Importance: low
numeric.mapping

Map NUMERIC values by precision and optionally scale to primitive or decimal types. Use none if all NUMERIC columns are to be represented by Connect’s DECIMAL logical type. Use best_fit_or_decimal if NUMERIC columns should be cast to Connect’s primitive type based on the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s DECIMAL logical type will be used instead, and the values will be represented in binary form within the change events. Use best_fit_or_double if NUMERIC columns should be cast to Connect’s primitive type based on the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s FLOAT64 type will be used instead. Use best_fit_or_string if NUMERIC columns should be cast to Connect’s primitive type based on the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s STRING type will be used instead. Use precision_only to map NUMERIC columns based only on the column’s precision, assuming the column’s scale is 0. The none option is the default but may lead to serialization issues since Connect’s DECIMAL type is mapped to its binary representation. One of the best_fit_or options will often be preferred. For backward compatibility reasons, the best_fit option is also available. It behaves the same as best_fit_or_decimal. This would require deletion of the table topic and the registered schemas if using non-JSON value.converter.

  • Type: string
  • Default: none
  • Valid Values: best_fit, best_fit_or_decimal, best_fit_or_double, best_fit_or_string, none, precision_only
  • Importance: low
numeric.default.scale

The default scale to use for numeric types when the scale cannot be determined.

  • Type: int
  • Default: 127
  • Valid Values: [-127,…,127]
  • Importance: low
oracle.date.mapping

Map Oracle DATE values to Connect types. Use date if all DATE columns are to be represented by Connect’s Date logical type. Use timestamp if DATE columns should be cast to Connect’s Timestamp. Despite the name similarity, Oracle DATE type has different semantics than Connect Date. timestamp will often be preferred for semantic similarity.

  • Type: string
  • Default: timestamp
  • Valid Values: date, timestamp
  • Importance: low
output.data.key.format

Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: JSON
  • Valid Values: AVRO, JSON, JSON_SR, PROTOBUF, STRING
  • Importance: high
output.data.value.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: JSON
  • Valid Values: AVRO, JSON, JSON_SR, PROTOBUF, STRING
  • Importance: high

Number of tasks for this connector

tasks.max

Maximum number of tasks to use for this connector.

  • Type: int
  • Default: 2
  • Valid Values: [1,…,1000]
  • Importance: high

Template variables

The connector uses template variables to create the name of the Kafka topic and the record key for each of the change events. The variables are similar to the Oracle GoldenGate Kafka Connect template variables which simplify migrating from Oracle GoldenGate to this connector. Variables are resolved at the task level and table level.

Connector and task variables

Variable keyword Description
${connectorName} Resolves to the name of the connector.
${databaseName} Resolves to the database name.
${emptyString} Resolves to an empty string.
${staticMap[]} Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square braces, in the following format: ${staticMap[dbo.table1=value1,dbo.table2=value2]}.
${currentTimestamp} or ${currentTimestamp[]} Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate}, ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

Table variables

Variable keyword Description
${schemaName} Resolves to the schema name for the table.
${tableName} Resolves to the short table name.
${fullyQualifiedTableName} Resolves to the fully-qualified table name including the period (.) delimiter between the schema and table names. For example, dbo.table1.

Column variables

Variable keyword Description
${columnName} Resolves to the column name.

Record variables

Variable keyword Description
${opType} Resolves to the type of the operation: READ, INSERT, UPDATE, DELETE, or TRUNCATE.
${opTimestamp} Resolves to the operation timestamp from the redo log.
${rowId} Resolves to the ID of the changed row.
${primaryKey} Resolves to the concatenated primary key values delimited by an underscore (_) character.
${primaryKeyStruct} Resolves to a STRUCT with fields for each of the primary key column values.
${primaryKeyStructOrValue} Resolves to either a STRUCT with fields for each of the 2+ primary key column values, or the column value if the primary key contains a single column.
${scn} Resolves to the system change number (SCN) when the change was made.
${cscn} Resolves to the system change number (SCN) when the change was committed.
${rbaseq} Resolves to the sequence number associated with the Redo Block Address (RBA) of the redo record associated with the change.
${rbablk} Resolves to the RBA block number within the log file.
${rbabyte} Resolves to the RBA byte offset within the block.
${currentTimestamp} or ${currentTimestamp[]} Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate}, ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

Supported Data Types

The following table lists data types and the associated Connect mapping.

Oracle data type SQL type code Connect mapping
CHAR or CHARACTER 1 STRING
LONG 1 STRING
VARCHAR 12 STRING
VARCHAR 12 STRING
NCHAR -15 STRING
NVARCHAR2 -9 STRING
RAW -3 BYTES
LONG RAW -1 BYTES
INT or INTEGER 2 DECIMAL
SMALLINT 2 DECIMAL
DEC or DECIMAL 2 DECIMAL
NUMBER 2 DECIMAL
NUMERIC 2 DECIMAL
DOUBLE PRECISION 6 FLOAT64
FLOAT 6 FLOAT64
REAL 6 FLOAT64
TIMESTAMP WITH TIMEZONE -101 TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE -102 TIMESTAMP
DATE 91 DATE
BLOB 2004 BYTES
CLOB 2005 BYTES
NCLOB 2011 BYTES
XMLTYPE 2009 BYTES

Note

The -101 and -102 codes for TIMESTAMP WITH TIMEZONE and TIMESTAMP WITH LOCAL TIMEZONE are Oracle-specific. BLOB, CLOB, NCLOB, and XMLTYPE are handled out-of-band with a separate LOB topic.

Troubleshooting and Tips

Note the following information.

Pre-19c database

Consider decreasing query.timeout.ms if the tables you are capturing records from do not have much activity.

Degraded condition with Error while polling for records

When this connector is created, you may see the connector is degraded with the following error event.

{
  "datacontenttype": "application/json",
  "data": {
    "level": "ERROR",
    "context": {
      "connectorId": "lcc-12abc3"
    },
    "summary": {
      "connectorErrorSummary": {
        "message": "Error while polling for records",
        "rootCause": "Failed to subscribe to the redo log topic 'lcc-22rwg2-ORCL-redo-log' even after waiting PT5M. Verify that this redo log topic exists in the brokers at SASL_SSL://<address>, and that the redo log reading task is able to produce to that topic."
      }
    }
  },
  "subject": "lcc-12abc3-llcc-12abc3-1",
  "specversion": "1.0",
  "id": "a12345bc-6d78-91e0-fg11-c3ac4f20220b",
  "source": "crn://confluent.cloud/connector=lcc-12abc3",
  "time": "2022-03-28T18:25:29.631Z",
  "type": "io.confluent.logevents.connect.TaskFailed"
}

When you have new inserts, updates, or deletes, this error message may self-correct, or you can increase the redo.log.startup.polling.limit.ms to a larger value for the connector. This will cause the connector to wait for a redo log topic to be created.

Alternatively, you can create a redo-log-topic with one partition and specify it in "redo.log.topic.name" : “redo-log-topic” when you create the connector.

Next Steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../../_images/topology.png