Oracle CDC Source Connector for Confluent Cloud

Note

If you are installing the connector locally for Confluent Platform, see Oracle CDC Source for Confluent Platform.

The fully-managed Oracle CDC Source connector for Confluent Cloud captures each change to rows in a database and then represents the changes as change event records in Apache Kafka® topics. The connector uses Oracle LogMiner to read the database redo log. The connector requires a database user with permissions to use LogMiner and permissions to select from all of the tables captured by the connector.

The connector can be configured to capture a subset of the tables in a single database, defined as all tables accessible by the user that match an include regular expression. It can also be configured to not capture tables that match a separate exclude regular expression.

The connector writes the changes from each of the tables to Kafka topics, where the table-to-topic mapping is determined by the table.topic.name.template connector configuration property. This property defaults to the dot-delimited fully-qualified name of the table (for example, database_name.schema_name.table_name).

The connector recognizes literals and several variables (for example, ${tableName} and ${schemaName}) to customize table-to-topic mapping. Variables are resolved at runtime. For example, the following configuration property results in changes to the ORCL.ADMIN.USERS table to be written to the Kafka topic named my-prefix.ORCL.ADMIN.USERS.

table.topic.name.template=my-prefix.${databaseName}.${schemaName}.${tableName}

For a list of template variables, see Template variables.

The connector is designed to write all of the raw Oracle redo log records to one Kafka topic logically referred to as the “redo log topic”. The redo.log.topic.name configuration property determines the name of this topic. The connector actually consumes this topic to identify and produce all of the table-specific events written to the table-specific topics. The connector can be configured by setting the table.topic.name.template property to an empty string to only write to the redo log topic without generating table-specific events to the table-specific topics.

Limitations

Be sure to review the following information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Oracle CDC Source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in an Oracle database and then monitoring and recording all subsequent row-level changes.

Important

Before configuring the connector, see Oracle Database Prerequisites for Oracle database configuration information and post-configuration validation steps.

Prerequisites
  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the Oracle Database Source connector icon.

Oracle CDC Source Connector Icon

Step 4: Set up the connection.

Complete the following and click Continue.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Enter a connector name.

  2. Select the way you want to provide Kafka Cluster credentials. You can either select a service account resource ID or you can enter an API key and secret (or generate these in the Cloud Console).

  3. Add the Database connection details.

    • Oracle server: The hostname or address for the Oracle server.
    • Oracle port: The port number used to connect to Oracle. Defaults to 1521.
    • Oracle SID: The Oracle system identifier (SID).
    • Oracle PDB: The Oracle PDB name. Set this only if using multi-tenant CDB/PDB architecture. By default, this is not set, which indicates that the tables to capture reside in the CDB root.
    • Oracle service: The Oracle service name. If set, the connector always connects to the database using this service name.
    • Oracle username: The name of the Oracle database user.
    • Oracle password: The password for the Oracle database user.
    • Trust store: The trust store file containing server CA certificate. Only required if using SSL to connect to the database.
    • Trust store password: The trust store password for the trust store. Only required when using SSL to connect to the database.
    • Enable Oracle FAN events: Whether the connection should allow using Oracle RAC Fast Application Notification (FAN) events. This is disabled by default, meaning FAN events are not used even if they are supported by the database. This should only be enabled when using Oracle RAC set up with FAN events. Enabling this feature may cause connection issues when the database is not set up to use FAN events.
  4. Add the Database details.

    • Table inclusion regex: The regular expression that matches the fully-qualified names of the tables including SID or PDB and schema. For example, if you use a non-container database, an example expression could be ORCLCDB[.]C##MYUSER[.](table1|orders|customers) or ORCLCDB.C##MYUSER.(table1|orders|customers). If you use a multi-tenant database, the example expression could be ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) or ORCLPDB1.C##MYUSER.(table1|orders|customers).
    • Table exclusion regex: The regular expression that matches the fully-qualified names of the tables including SID or PDB and scheme. For example, if you use a non-container database, an example expression could be ORCLCDB[.]C##MYUSER[.](table1|orders|customers) or ORCLCDB.C##MYUSER.(table1|orders|customers). If you use a multi-tenant database, it could be ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) or ORCLPDB1.C##MYUSER.(table1|orders|customers). A blank regex (the default) indicates there are no exclusions.
    • Start from: What the connector should do when it starts for the first time. The value is either the literal snapshot (the default), the literal current, the literal force_current, an Oracle System Change Number (SCN), or a database timestamp in the form DD-MON-YYYY HH24:MI:SS. The snapshot literal instructs the connector to snapshot captured tables the first time it is started, then continue processing redo log events from the point in time when the snapshot was taken. The current literal instructs the connector to start from the current Oracle SCN without snapshotting. The force_current literal is the same as current, but ignores any previously stored offsets when the connector is restarted. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs.
  5. Add the optional Connector details.

    • Emit tombstone on delete: If set to true, delete operations emit a tombstone record with null value. Defaults to false.
    • Behavior on dictionary mismatch: Specifies the behavior when the connector is not able to parse the value of a column due to a dictionary mismatch caused by a DDL statement. This can happen if the dictionary mode online is set, but the connector is streaming historical data recorded before DDL changes occurred. This property defaults to fail (the connector task fails). The log option logs the unparsable record and skips the problematic record without failing the connector task.
    • Behavior on unparsable statement: Specifies the desired behavior when the connector encounters a SQL statement that could not be parsed. The default option fail causes the connector task to fail. The log option logs the unparsable statement and skip the problematic record without failing the connector task.
    • Database timezone: When time zone information is not available, this property defaults to UTC when the connector parses Oracle DATE and TIMESTAMP types. For example, if db.timezone=UTC, the data for both DATE and TIMESTAMP is parsed using the UTC time zone. This value has to be a valid java.util.TimeZone ID.
    • Database timezone for DATE type: When time zone information is not available, this is a default time zone to use when the connector parses Oracle DATE type. If this is set, the value overwrites the value of db.timezone for DATE type. For example, if db.timezone=UTC and db.timezone.date=America/Los_Angeles, TIMESTAMP type data is parsed using the UTC time zone. DATE type data is parsed using the America/Los_Angeles time zone. This value has to be a valid java.util.TimeZone ID.
    • Redo log startup polling limit (ms): The amount of time to wait for the redo log to be present on connector startup. This is only relevant when the connector is configured to capture change events. Defaults to five minutes (or 50000 milliseconds). The maximum is one hour (or 3600000 milliseconds).
  6. Add the optional Connection details.

    • Query timeout (ms): The timeout in milliseconds for any query submitted to Oracle. The default is five minutes (or 300000 milliseconds). If set to a negative value, the connector does not enforce timeout on queries.
    • Maximum batch size: The maximum number of records returned by the connector to Kafka. The connector may return fewer records if no additional records are available.
    • Poll linger milliseconds: The maximum time to wait for a record before returning an empty batch. The default is five seconds (or 5000 milliseconds).)
    • Maximum buffer size: The maximum number of records from all snapshot threads and from the redo log that can be buffered into batches. The default value 0 means a buffer size is computed from the maximum batch size and number of threads.
    • Database poll interval (ms): The interval between polls to retrieve the database redo log events. This has no effect when continuous mine is available and enabled. The default is one second (or 1000 milliseconds). The minimum value allowed is 500 milliseconds.
    • Snapshot row fetch size: The number of rows to provide as a hint to the JDBC driver when fetching table rows in a snapshot. The default is 2000 rows. Use 0 to disable this hint.
    • Redo log fetch size: The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. The default is 5000 rows. Use 0 to disable this hint.
    • Included fields: Comma-separated list of the fields to include in the redo log.
    • Excluded fields: Comma-separated list of the fields to exclude from the redo log.
  7. Add the optional Output messages details.

    • Topic name template: The template that defines the name of the Kafka topic where the connector writes a change event. The value can be a constant if the connector writes all records from all captured tables to one topic. Or, the value can include any supported template variables, including ${databaseName}, ${schemaName}, ${tableName}, ${connectorName}, and so on. The default is an empty string, which indicates that the connector is not producing change event records. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable. You can leave this property empty only when a redo log topic is specified. If left empty, the connector only writes to the redo log topic.

    • Redo log corruption topic: The name of the Kafka topic where the connector records events that describe the information about corruption in the Oracle redo log, and which identifies missed data. Defaults to an empty string. Leaving this property empty signals that the connector should not write this information to Kafka.

    • Redo log topic: The name of the Kafka topic where the connector records all raw redo log events. If left empty, this property defaults to ${connectorName}-${databaseName}-redo-log. For example, lcc-mycdcconnector-myoracledb-redo-log.

      Note

      ACLs need to be set for output topics. For more information, see Oracle CDC Source connector ACLs.

    • Key template: The template that defines the Kafka record key for each change event. By default the key contains a struct with the primary key fields, or null if the table has no primary or unique key.

    • Dictionary mode: The dictionary handling mode used by the connector. Options are auto, online, or redo_log. Defaults to auto.

      • auto: The connector uses the dictionary from the online catalog until a DDL statement that evolves the table schema is encountered. When this occurs, the connector starts using the dictionary from the archived redo logs. Once the DDL statement has been processed, the connector reverts back to using the online catalog. Use this mode if DDL statements are expected.
      • online: The connector always uses the online dictionary catalog. Use this mode if no DDL statements are expected.
      • redo_log: the connector always uses the dictionary catalog from the archived redo logs. Use this mode if you cannot access the online redo log. Note that any CDC events are delayed and not processed by the connector until they are archived from online logs.
    • Output table name field: The name of the field in the change record written to Kafka that contains the schema-qualified name of the affected Oracle table (for example, dbo.Customers). A blank value indicates that this field should not be included in the change records.

    • Output SCN field: The name of the field in the change record written to Kafka that contains the Oracle system change number (SCN) when this change was made. Leaving this empty indicates that this field should not be included in the change records.

    • Output operation type field: The name of the field in the change record written to Kafka that contains the operation type for this change event. Leaving this empty indicates that this field should not be included in the change records.

    • Output operation timestamp field: The name of the field in the change record written to Kafka that contains the operation timestamp for this change event. Leaving this empty indicates that this field should not be included in the change records.

    • Output current timestamp field: The name of the field in the change record written to Kafka that contains the connector’s timestamp that indicates when the change event was processed. Leaving this empty indicates that this field should not be included in the change records.

    • Output row ID field: The name of the field in the change record written to Kafka that contains the row ID of the table changed by this event. Leaving this empty indicates that this field should not be included in the change records.

    • Output username field: The name of the field in the change record written to Kafka that contains the name of the Oracle user who executed the transaction that resulted in this change. Leaving this empty indicates that this field should not be included in the change records.

    • Output redo field: The name of the field in the change record written to Kafka that contains the original redo DML statement from which this change record was created. Leaving this empty indicates that this field should not be included in the change records.

    • Output undo field: The name of the field in the change record written to Kafka that contains the original undo DML statement that effectively undoes this change, and which represents the “before” state of the row. Leaving this empty indicates that this field should not be included in the change records.

    • Output operation type read value: The value of the operation type for a read (snapshot) change event. By default this property value is set to R.

    • Output operation type insert value: The value of the operation type for an insert change event. By default this property value is set to I.

    • Output operation type update value: The value of the operation type for an update change event. By default this property value is set to U.

    • Output operation type delete value: The value of the operation type for a delete change event. By default this property value is set to D.

    • Output operation type truncate value: The value of the operation type for a truncate change event. By default this property value is set to T.

    • LOB topic template: The template that defines the name of the Kafka topic where the connector writes LOB objects. The value can be a constant if all LOB objects from all captured tables will be written to one topic, or the value can include any supported template variables, including ${columnName}, ${databaseName}, ${schemaName}, ${tableName}, ${connectorName}, and so on. The default is empty, which ignores all LOB type columns if any exist on captured tables. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable.

    • Snapshot by partitions: Whether the connector should perform snapshots on each table partition if the table is defined to use partitions. By default, this is set to false, meaning that one snapshot is performed on each table in its entirety.

    • Snapshot threads per task: The number of threads that can be used in each task to perform snapshots. This is only used in each task where the value is larger than the number of tables assigned to that task. Defaults to 4.

    • Enable large LOB object support: If true, the connector will support large LOB objects that are split across multiple redo log records. The connector emits commit messages to the redo log topic and uses these commit messages to track when a large LOB object can be emitted to the LOB topic.

    • Map numeric values…: Map NUMERIC values by precision and optionally scale to primitive or decimal types. The none option is the default, but may lead to serialization issues since the Connect DECIMAL type is mapped to its binary representation. One of the best_fit_or... options are generally preferred. The following options can be used:

      • Use none if you want all NUMERIC columns to be represented by the Connect DECIMAL logical type.
      • Use best_fit_or_decimal if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect DECIMAL logical type is used instead.
      • Use best_fit_or_double if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect FLOAT64 type is used instead.
      • Use best_fit_or_string if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect STRING type is used instead.
      • Use precision_only to map NUMERIC columns based only on the column’s precision, assuming the column’s scale is 0.
      • For backward compatibility, the best_fit option is also available. It behaves the same as best_fit_or_decimal.
    • Numeric default scale: The default scale to use for numeric types when the scale cannot be determined.

    • Map Oracle DATE type to Connect data type: Map Oracle DATE values to Connect types.

    • Output Kafka record key format: Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

    • Output Kafka record value format: Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  8. Enter the number of tasks in use by the connector. More tasks may improve performance. Defaults to 2 tasks.

  9. Transforms and Predicates: See the Single Message Transforms (SMT) documentation for details. For a few example SMTs, see SMT Examples.

See Configuration Properties for all properties and definitions.

Step 5: Launch the connector.

Verify the connection details by previewing the running configuration. Once you’ve validated that the properties are configured to your satisfaction, click Launch.

Tip

For information about previewing your connector output, see Connector Data Previews.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running. It may take a few minutes.

Step 7: Check the Kafka topic.

After the connector is running, verify that records are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../../_images/topology.png

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

  • Make sure you have all your prerequisites completed.
  • The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.

Step 1: List the available connectors.

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

confluent connect plugin describe <connector-catalog-name>

For example:

confluent connect plugin describe OracleCdcSource

Example output:

The following are required configs:
connector.class : OracleCdcSource
name
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
oracle.server
oracle.sid
oracle.username
table.inclusion.regex

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "name": "OracleCdcSourceConnector_0",
  "config": {
    "connector.class": "OracleCdcSource",
    "name": "OracleCdcSourceConnector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "****************",
    "kafka.api.secret": "**************************************************",
    "oracle.server": "database-5.abcdefg12345.us-west-2.rds.amazonaws.com",
    "oracle.port": "1521",
    "oracle.sid": "ORCL",
    "oracle.username": "admin",
    "oracle.password": "**********",
    "table.inclusion.regex": "ORCL[.]ADMIN[.]CUSTOMERS",
    "start.from": "SNAPSHOT",
    "query.timeout.ms": "60000",
    "redo.log.row.fetch.size": "1",
    "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
    "lob.topic.name.template":"${databaseName}.${schemaName}.${tableName}.${columnName}",
    "numeric.mapping": "BEST_FIT_OR_DOUBLE",
    "output.data.key.format": "JSON_SR",
    "output.data.value.format": "JSON_SR",
    "tasks.max": "2"
  }
}

Note the following property definitions:

  • "name": Sets a name for your new connector.
  • "connector.class": Identifies the connector plugin name.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "table.inclusion.regex": The regular expression that matches the fully-qualified names of the tables including SID or PDB and schema. For example, if you use a non-container database, in addition to the example shown in the configuration above, an example expression could be ORCLCDB[.]C##MYUSER[.](table1|orders|customers) or ORCLCDB.C##MYUSER.(table1|orders|customers). If you use a multi-tenant database, the example expression could be ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) or ORCLPDB1.C##MYUSER.(table1|orders|customers).

  • "start.from": What the connector should do when it starts for the first time. The value is either the literal snapshot (the default), the literal current, the literal force_current, an Oracle System Change Number (SCN), or a database timestamp in the form DD-MON-YYYY HH24:MI:SS. The snapshot literal instructs the connector to snapshot captured tables the first time it is started, then continue processing redo log events from the point in time when the snapshot was taken. The current literal instructs the connector to start from the current Oracle SCN without snapshotting. The force_current literal is the same as current, but ignores any previously stored offsets when the connector is restarted. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs.

  • "query.timeout.ms": The timeout in milliseconds for any query submitted to Oracle. The default is five minutes (or 300000 milliseconds). If set to a negative value, the connector does not enforce timeout on queries.

  • "redo.log.row.fetch.size": The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. The default is 5000 rows. Use 0 to disable this hint.

  • "table.topic.name.template": The template that defines the name of the Kafka topic where the connector writes a change event. The value can be a constant if the connector writes all records from all captured tables to one topic. Or, the value can include any supported template variables, including ${databaseName}, ${schemaName}, ${tableName}, ${connectorName}, and so on. The default is an empty string, which indicates that the connector is not producing change event records. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable. You can leave this property empty only when a redo log topic is specified. If left empty, the connector only writes to the redo log topic.

  • "numeric.mapping": Map NUMERIC values by precision and optionally scale to primitive or decimal types. The none option is the default, but may lead to serialization issues since the Connect DECIMAL type is mapped to its binary representation. One of the best_fit_or... options are generally preferred. The following options can be used:

    • Use none if you want all NUMERIC columns to be represented by the Connect DECIMAL logical type.
    • Use best_fit_or_decimal if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect DECIMAL logical type is used instead.
    • Use best_fit_or_double if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect FLOAT64 type is used instead.
    • Use best_fit_or_string if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect STRING type is used instead.
    • Use precision_only to map NUMERIC columns based only on the column’s precision, assuming the column’s scale is 0.
    • For backward compatibility, the best_fit option is also available. It behaves the same as best_fit_or_decimal.
  • "output.data.key.format": Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • "output.data.value.format": Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • "tasks.max": Sets the maximum number of tasks in use by the connector. More tasks may improve performance. Defaults to 2 tasks.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs. For a few example SMTs, see SMT Examples.

See Configuration Properties for all properties and definitions.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent connect create --config <file-name>.json

For example:

confluent connect create --config oracle-cdc-source.json

Example output:

Created connector OracleCdcSource_0 lcc-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

confluent connect list

Example output:

ID          |            Name       | Status  |  Type
+-----------+-----------------------+---------+-------+
lcc-ix4dl   | OracleCdcSource_0     | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../../_images/topology.png

Configuration Properties

The following connector configuration properties are used for this connector.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key
  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret
  • Type: password
  • Importance: high

How should we connect to your database?

oracle.server

The hostname or address for the Oracle server.

  • Type: string
  • Valid Values: Must match the regex ^[^\?=%&\(\)]*$
  • Importance: high
oracle.port

The port number used to connect to Oracle.

  • Type: int
  • Default: 1521
  • Valid Values: [0,…,65535]
  • Importance: high
oracle.sid

The Oracle system identifier (SID).

  • Type: string
  • Valid Values: Must match the regex ^[a-zA-Z][a-zA-Z0-9$#_]*$
  • Importance: high
oracle.pdb.name

The Oracle PDB name. Set this only if using multi-tenant CDB/PDB architecture. By default, this is not set, which indicates that the tables to capture reside in the CDB root.

  • Type: string
  • Valid Values: Must match the regex ^[a-zA-Z][a-zA-Z0-9$#_]*$
  • Importance: high
oracle.service.name

The Oracle service name. If set, the connector always connects to the database using provided service name.

  • Type: string
  • Valid Values: Must match the regex ^[a-zA-Z][a-zA-Z0-9$#_]*$
  • Importance: low
oracle.username

The name of the Oracle database user.

  • Type: string
  • Valid Values: Must match the regex ^[^\?=%&\(\)]*$
  • Importance: high
oracle.password

The password for the Oracle database user.

  • Type: password
  • Importance: high
ssl.truststorefile

The trust store containing server CA certificate. Only required when using SSL to connect to the database.

  • Type: password
  • Default: [hidden]
  • Importance: low
ssl.truststorepassword

The trust store password containing server CA certificate. Only required when using SSL to connect to the database.

  • Type: password
  • Default: [hidden]
  • Importance: low
oracle.fan.events.enable

Whether the connection should allow using Oracle RAC Fast Application Notification (FAN) events. This is disabled by default, meaning FAN events will not be used even if they are supported by the database. This should only be enabled when using Oracle RAC set up with FAN events. Enabling this feature may cause connection issues when the database is not set up to use FAN events.

  • Type: boolean
  • Default: false
  • Importance: low

Database details

table.inclusion.regex

The regular expression that matches the fully-qualified names of the tables including sid or pdb and schema. For example, if you use a non-container database, it could be ORCLCDB[.]C##MYUSER[.](table1|orders|customers) or ORCLCDB.C##MYUSER.(table1|orders|customers). If you use a multitenant database, it could be ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) or ORCLPDB1.C##MYUSER.(table1|orders|customers).

  • Type: string
  • Importance: high
table.exclusion.regex

The regular expression that matches the fully-qualified names of the tables including sid or pdb and scheme. For example, if you use a non-container database, it could be ORCLCDB[.]C##MYUSER[.](table1|orders|customers) or ORCLCDB.C##MYUSER.(table1|orders|customers). If you use a multitenant database, it could be ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) or ORCLPDB1.C##MYUSER.(table1|orders|customers). A blank regex (the default) implies no exclusions.

  • Type: string
  • Default: “”
  • Importance: high
start.from

What the connector should do when it starts for the first time. The value is either the literal snapshot (default), the literal current, the literal force_current, an Oracle System Change Number (SCN), or a database timestamp in the form DD-MON-YYYY HH24:MI:SS. The snapshot literal instructs the connector to snapshot captured tables the first time it is started, then continue processing redo log events from the point in time when the snapshot was taken. The current literal instructs the connector to start from the current Oracle SCN without snapshotting. The force_current literal is the same as current but it will ignore any previously stored offsets when the connector is restarted. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs.

  • Type: string
  • Default: snapshot
  • Valid Values: current, force_current, snapshot
  • Importance: medium

Connector details

emit.tombstone.on.delete

If true, delete operations emit a tombstone record with null value.

  • Type: boolean
  • Default: false
  • Importance: low
behavior.on.dictionary.mismatch

Specifies the desired behavior when the connector is not able to parse the value of a column due to a dictionary mismatch caused by DDL statement. This can happen if the online dictionary mode is specified but the connector is streaming historical data recorded before DDL changes occurred. The default option fail will cause the connector task to fail. The log option will log the unparsable record and skip the problematic record without failing the connector task.

  • Type: string
  • Default: fail
  • Valid Values: fail, log
  • Importance: low
behavior.on.unparsable.statement

Specifies the desired behavior when the connector encounters a SQL statement that could not be parsed. The default option fail will cause the connector task to fail. The log option will log the unparsable statement and skip the problematic record without failing the connector task.

  • Type: string
  • Default: fail
  • Valid Values: fail, log
  • Importance: low
db.timezone

Default timezone to assume when parsing Oracle DATE and TIMESTAMP types for which timezone info is not available. For example, if db.timezone=UTC, data of both DATE and TIMESTAMP will be parsed as if in UTC timezone. The value has to be a valid java.util.TimeZone ID.

  • Type: string
  • Default: UTC
  • Importance: low
db.timezone.date

Default timezone to assume when parsing Oracle DATE type for which timezone info is not available. If this is set, the value will overwrite the value of db.timezone for DATE type. For example, if db.timezone=UTC and db.timezone.date=America/Los_Angeles, data of TIMESTAMP type will be parsed as if in UTC timezone, and data of DATE type will be parsed as if in America/Los_Angeles timezone. The value has to be a valid java.util.TimeZone ID.

  • Type: string
  • Importance: low
redo.log.startup.polling.limit.ms

The amount of time to wait for the redo log to be present on connector startup. This is only relevant when connector is configured to capture change events. The default is 5 minutes (or 50000 milliseconds). The maximum is 1 hour (or 3600000 milliseconds)

  • Type: long
  • Default: 300000 (5 minutes)
  • Valid Values: [0,…,3600000]
  • Importance: low

Connection details

query.timeout.ms

The timeout in milliseconds for any query submitted to Oracle. The default is 5 minutes (or 300000 milliseconds). If set to negative values, then the connector will not enforce timeout on queries.

  • Type: long
  • Default: 300000 (5 minutes)
  • Importance: medium
max.batch.size

The maximum number of records that will be returned by the connector to Connect. The connector may still return fewer records if no additional records are available.

  • Type: int
  • Default: 1000
  • Valid Values: [100,…,10000]
  • Importance: medium
poll.linger.ms

The maximum time to wait for a record before returning an empty batch. The default is 5 seconds.

  • Type: long
  • Default: 5000 (5 seconds)
  • Valid Values: [0,…,300000]
  • Importance: medium
max.buffer.size

The maximum number of records from all snapshot threads and from the redo log that can be buffered into batches. The default of 0 means a buffer size will be computed from the maximum batch size and number of threads.

  • Type: int
  • Default: 0
  • Valid Values: [0,…,10000]
  • Importance: low
redo.log.poll.interval.ms

The interval between polls to retrieve the database redo log events. This has no effect when continuous mine is available and enabled. The default is 1 second (or 1000 milliseconds). The minimum value allowed is 500 milliseconds

  • Type: long
  • Default: 500
  • Valid Values: [500,…,60000]
  • Importance: medium
snapshot.row.fetch.size

The number of rows to provide as a hint to the JDBC driver when fetching table rows in a snapshot. A value of 0 disables this hint.

  • Type: int
  • Default: 2000
  • Valid Values: [0,…,10000]
  • Importance: medium
redo.log.row.fetch.size

The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. A value of 0 disables this hint.

  • Type: int
  • Default: 5000
  • Valid Values: [0,…,10000]
  • Importance: medium
redo.log.row.poll.fields.include

Comma-separated list of the fields to include in the redo log

  • Type: list
  • Default: “”
  • Importance: low
redo.log.row.poll.fields.exclude

Comma-separated list of the fields to exclude from the redo log

  • Type: list
  • Default: “”
  • Importance: low

Output messages

table.topic.name.template

The template that defines the name of the Kafka topic to which the change event is to be written. The value can be a constant if all records from all captured tables are to be written to one topic, or the value can include any supported template variables, including ${databaseName}, ${schemaName}, ${tableName}, ${connectorName}, etc. The default is empty string, which indicates that the connector is not producing change event records. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable. May be blank only when a redo log topic is specified, in which case the connector will only write to the redo log topic.

  • Type: string
  • Default: “”
  • Importance: high
redo.log.corruption.topic

The name of the Kafka topic to which the connector will record events that describe the information about corruption in the Oracle redo log, and which signify missed data. A blank topic name (the default) signals that this information should not be written to Kafka.

  • Type: string
  • Default: “”
  • Importance: high
redo.log.topic.name

The name of the Kafka topic to which the connector will record all raw redo log events.

  • Type: string
  • Default: ${connectorName}-${databaseName}-redo-log
  • Importance: high
key.template

The template that defines the Kafka record key for each change event. By default the key will contain a struct with the primary key fields, or null if the table has no primary or unique key.

  • Type: string
  • Default: ${primaryKeyStructOrValue}
  • Importance: medium
oracle.dictionary.mode

The dictionary handling mode used by the connector. One of auto, online, or redo_log. auto: the connector uses the dictionary from the online catalog until a DDL statement to evolve the table schema is encountered. At which point, the connector starts using the dictionary from archived redo logs. Once the DDL statement has been processed, the connector reverts back to using the online catalog. Use this mode if DDL statements are expected. online: the connector always uses the online dictionary catalog. Use this mode if no DDL statements are expected. redo_log: the connector always uses the dictionary catalog from archived redo logs. Use this mode if you can not access the online redo log. Note that any CDC events will be delayed until they are archived from online logs before they are processed by the connector.

  • Type: string
  • Default: auto
  • Valid Values: auto, online, redo_log
  • Importance: low
output.table.name.field

The name of the field in the change record written to Kafka that contains the schema-qualified name of the affected Oracle table (e.g., dbo.Customers). A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: table
  • Importance: low
output.scn.field

The name of the field in the change record written to Kafka that contains the Oracle system change number (SCN) when this change was made. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: scn
  • Importance: low
output.op.type.field

The name of the field in the change record written to Kafka that contains the operation type for this change event. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: op_type
  • Importance: low
output.op.ts.field

The name of the field in the change record written to Kafka that contains the operation timestamp for this change event. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: op_ts
  • Importance: low
output.current.ts.field

The name of the field in the change record written to Kafka that contains the connector’s timestamp when this change event was processed. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: current_ts
  • Importance: low
output.row.id.field

The name of the field in the change record written to Kafka that contains the row ID of the table changed by this event. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: row_id
  • Importance: low
output.username.field

The name of the field in the change record written to Kafka that contains the name of the Oracle user that executed the transaction that resulted in this change. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: username
  • Importance: low
output.redo.field

The name of the field in the change record written to Kafka that contains the original redo DML statement from which this change record was created. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: “”
  • Importance: low
output.undo.field

The name of the field in the change record written to Kafka that contains the original undo DML statement that effectively undoes this change and represents the “before” state of the row. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: “”
  • Importance: low
output.op.type.read.value

The value of the operation type for a read (snapshot) change event. By default this is “R”.

  • Type: string
  • Default: R
  • Importance: low
output.op.type.insert.value

The value of the operation type for an insert change event. By default this is “I”.

  • Type: string
  • Default: I
  • Importance: low
output.op.type.update.value

The value of the operation type for an update change event. By default this is “U”.

  • Type: string
  • Default: U
  • Importance: low
output.op.type.delete.value

The value of the operation type for a delete change event. By default this is “D”.

  • Type: string
  • Default: D
  • Importance: low
output.op.type.truncate.value

The value of the operation type for a truncate change event. By default this is “T”.

  • Type: string
  • Default: T
  • Importance: low
lob.topic.name.template

The template that defines the name of the Kafka topic to which LOB objects should be written. The value can be a constant if all LOB objects from all captured tables are to be written to one topic, or the value can include any supported template variables, including ${columnName}, ${databaseName}, ${schemaName}, ${tableName}, ${connectorName}, etc. The default is empty, which will ignore all LOB type columns if any exist on captured tables. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable.

  • Type: string
  • Default: “”
  • Importance: low
snapshot.by.table.partitions

Whether the connector should perform snapshots on each table partition if the table is defined to use partitions. This is false by default, meaning that one snapshot is performed on each table in its entirety.

  • Type: boolean
  • Default: true
  • Importance: low
snapshot.threads.per.task

The number of threads that can be used in each task to perform snapshots. This is only used in each task the value is larger than the number of tables assigned to that task. The default is 4.

  • Type: int
  • Default: 4
  • Valid Values: [1,…,10]
  • Importance: low
enable.large.lob.object.support

If true, the connector will support large LOB objects that are split across multiple redo log records. The connector will emit commit messages to the redo log topic and use these commit messages to track when a large LOB object can be emitted to the LOB topic.

  • Type: boolean
  • Default: false
  • Importance: low
numeric.mapping

Map NUMERIC values by precision and optionally scale to primitive or decimal types. Use none if all NUMERIC columns are to be represented by Connect’s DECIMAL logical type. Use best_fit_or_decimal if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s DECIMAL logical type will be used instead. Use best_fit_or_double if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s FLOAT64 type will be used instead. Use best_fit_or_string if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s STRING type will be used instead. Use precision_only to map NUMERIC columns based only on the column’s precision assuming that column’s scale is 0. The none option is the default, but may lead to serialization issues since Connect’s DECIMAL type is mapped to its binary representation. One of the best_fit_or options will often be preferred. For backwards compatibility reasons, the best_fit option is also available. It behaves the same as best_fit_or_decimal

  • Type: string
  • Default: none
  • Valid Values: best_fit, best_fit_or_decimal, best_fit_or_double, best_fit_or_string, none, precision_only
  • Importance: low
numeric.default.scale

The default scale to use for numeric types when the scale cannot be determined.

  • Type: int
  • Default: 127
  • Valid Values: [-127,…,127]
  • Importance: low
oracle.date.mapping

Map Oracle DATE values to Connect types. Use date if all DATE columns are to be represented by Connect’s Date logical type. Use timestamp if DATE columns should be cast to Connect’s Timestamp. The date option is the default for backward compatibility. Despite the name similarity, Oracle DATE type has different semantics than Connect Date. timestamp will often be preferred for semantic similarity.

  • Type: string
  • Default: timestamp
  • Valid Values: date, timestamp
  • Importance: low
output.data.key.format

Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: JSON
  • Valid Values: AVRO, JSON, JSON_SR, PROTOBUF, STRING
  • Importance: high
output.data.value.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: JSON
  • Valid Values: AVRO, JSON, JSON_SR, PROTOBUF, STRING
  • Importance: high

Number of tasks for this connector

tasks.max

Maximum number of tasks to use for this connector.

  • Type: int
  • Default: 2
  • Valid Values: [1,…,1000]
  • Importance: high

Template variables

The connector uses template variables to create the name of the Kafka topic and the record key for each of the change events. The variables are similar to the Oracle GoldenGate Kafka Connect template variables which simplify migrating from Oracle GoldenGate to this connector. Variables are resolved at the task level and table level.

Connector and task variables

Variable keyword Description
${connectorName} Resolves to the name of the connector.
${databaseName} Resolves to the database name.
${emptyString} Resolves to an empty string.
${staticMap[]} Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square braces, in the following format: ${staticMap[dbo.table1=value1,dbo.table2=value2]}.
${currentTimestamp} or ${currentTimestamp[]} Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate}, ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}
${connectorName} Resolves to the name of the connector.

Table variables

Variable keyword Description
${schemaName} Resolves to the schema name for the table.
${tableName} Resolves to the short table name.
${fullyQualifiedTableName} Resolves to the fully-qualified table name including the period (.) delimiter between the schema and table names. For example, dbo.table1.

Column variables

Variable keyword Description
${columnName} Resolves to the column name.

Record variables

Variable keyword Description
${opType} Resolves to the type of the operation: READ, INSERT, UPDATE, DELETE, or TRUNCATE.
${opTimestamp} Resolves to the operation timestamp from the redo log.
${rowId} Resolves to the ID of the changed row.
${primaryKey} Resolves to the concatenated primary key values delimited by an underscore (_) character.
${primaryKeyStruct} Resolves to a STRUCT with fields for each of the primary key column values.
${primaryKeyStructOrValue} Resolves to either a STRUCT with fields for each of the 2+ primary key column values, or the column value if the primary key contains a single column.
${scn} Resolves to the system change number (SCN) when the change was made.
${cscn} Resolves to the system change number (SCN) when the change was committed.
${rbaseq} Resolves to the sequence number associated with the Redo Block Address (RBA) of the redo record associated with the change.
${rbablk} Resolves to the RBA block number within the log file.
${rbabyte} Resolves to the RBA byte offset within the block.
${currentTimestamp} or ${currentTimestamp[]} Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate}, ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

Supported Data Types

The following table lists data types and the associated Connect mapping.

Oracle data type SQL type code Connect mapping
CHAR or CHARACTER 1 STRING
LONG 1 STRING
VARCHAR 12 STRING
VARCHAR 12 STRING
NCHAR -15 STRING
NVARCHAR2 -9 STRING
RAW -3 BYTES
LONG RAW -1 BYTES
INT or INTEGER 2 DECIMAL
SMALLINT 2 DECIMAL
DEC or DECIMAL 2 DECIMAL
NUMBER 2 DECIMAL
NUMERIC 2 DECIMAL
DOUBLE PRECISION 6 FLOAT64
FLOAT 6 FLOAT64
REAL 6 FLOAT64
TIMESTAMP WITH TIMEZONE -101 TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE -102 TIMESTAMP
DATE 91 DATE
BLOB 2004 BYTES
CLOB 2005 BYTES
NCLOB 2011 BYTES
XMLTYPE 2009 BYTES

Note

The -101 and -102 codes for TIMESTAMP WITH TIMEZONE and TIMESTAMP WITH LOCAL TIMEZONE are Oracle-specific. BLOB, CLOB, NCLOB, and XMLTYPE are handled out-of-band with a separate LOB topic.

Troubleshooting and Tips

Note the following information.

Pre-19c database

Consider decreasing query.timeout.ms if the tables you are capturing records from do not have much activity.

Degraded condition with Error while polling for records

When this connector is created, you may see the connector is degraded with the following error event.

{
  "datacontenttype": "application/json",
  "data": {
    "level": "ERROR",
    "context": {
      "connectorId": "lcc-12abc3"
    },
    "summary": {
      "connectorErrorSummary": {
        "message": "Error while polling for records",
        "rootCause": "Failed to subscribe to the redo log topic 'lcc-22rwg2-ORCL-redo-log' even after waiting PT5M. Verify that this redo log topic exists in the brokers at SASL_SSL://<address>, and that the redo log reading task is able to produce to that topic."
      }
    }
  },
  "subject": "lcc-12abc3-llcc-12abc3-1",
  "specversion": "1.0",
  "id": "a12345bc-6d78-91e0-fg11-c3ac4f20220b",
  "source": "crn://confluent.cloud/connector=lcc-12abc3",
  "time": "2022-03-28T18:25:29.631Z",
  "type": "io.confluent.logevents.connect.TaskFailed"
}

When you have new inserts, updates, or deletes, this error message may self-correct, or you can increase the redo.log.startup.polling.limit.ms to a larger value for the connector. This will cause the connector to wait for a redo log topic to be created.

Alternatively, you can create a redo-log-topic with one partition and specify it in "redo.log.topic.name" : “redo-log-topic” when you create the connector.