Oracle CDC Source Connector for Confluent Cloud¶
Oracle 11g, 12c and 18c Deprecation
Oracle discontinued support for the following Oracle Database versions:
- Version 11g on December 31, 2020
- Version 12c on March 31, 2022
- Version 18c on June 30, 2021
Oracle CDC connector support for each of these versions will reach end-of-life on June 30, 2025. Confluent currently supports Oracle Database versions 19c and later.
The fully-managed Oracle CDC Source connector for Confluent Cloud captures each change to rows in a database and then represents the changes as change event records in Apache Kafka® topics. The connector uses Oracle LogMiner to read the database redo log. The connector requires a database user with permissions to use LogMiner and permissions to select from all of the tables captured by the connector.
Note
This is a Quick Start for the fully-managed cloud connector. If you are installing the connector locally for Confluent Platform, see Oracle CDC Source for Confluent Platform.
The connector can be configured to capture a subset of the tables in a single database, defined as all tables accessible by the user that match an include regular expression. It can also be configured to not capture tables that match a separate exclude regular expression.
The connector writes the changes from each of the tables to Kafka topics, where
the table-to-topic mapping is determined by the table.topic.name.template
connector configuration property. This property defaults to the
dot-delimited fully-qualified name of the table (for example,
database_name.schema_name.table_name
).
The connector recognizes literals and several variables (for example,
${tableName}
and ${schemaName}
) to customize table-to-topic mapping.
Variables are resolved at runtime. For example, the following configuration
property results in changes to the ORCL.ADMIN.USERS
table to be written to
the Kafka topic named my-prefix.ORCL.ADMIN.USERS
.
table.topic.name.template=my-prefix.${databaseName}.${schemaName}.${tableName}
For a list of template variables, see Template variables.
The connector is designed to write all of the raw Oracle redo log records to one
Kafka topic logically referred to as the “redo log topic”. The
redo.log.topic.name
configuration property determines the name of this
topic. The connector actually consumes this topic to identify and produce all of
the table-specific events written to the table-specific topics. The connector
can be configured by setting the table.topic.name.template
property to an
empty string to only write to the redo log topic without generating
table-specific events to the table-specific topics.
Limitations¶
Be sure to review the following information.
- For connector limitations, see Oracle CDC Source Connector limitations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud Oracle CDC Source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in an Oracle database and then monitoring and recording all subsequent row-level changes.
Important
- Before configuring the connector, see Oracle Database Prerequisites for Oracle CDC Source Connector for Confluent Cloud for Oracle database configuration information and post-configuration validation steps.
- See Troubleshooting Oracle CDC Source Connector for Confluent Cloud for additional information.
- Prerequisites
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.
- The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
- Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.
- For networking considerations, see Networking and DNS. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors.
- Kafka cluster credentials. The following lists the different ways you can provide credentials.
- Enter an existing service account resource ID.
- Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
- Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster¶
See the Quick Start for Confluent Cloud for installation instructions.
Step 2: Add a connector¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details¶
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
At the Add Oracle CDC Source Premium Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
- Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
- Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
- Click Continue.
- Add the following database connection details:
- Oracle server: The hostname or address for the Oracle server.
- Oracle port: The port number used to connect to Oracle. Defaults
to
1521
. - Oracle SID: The Oracle system identifier (SID).
- Oracle PDB: The Oracle PDB name. Set this only if using multi-tenant CDB/PDB architecture. By default, this is not set, which indicates that the tables to capture reside in the CDB root.
- Oracle service: The Oracle service name. If set, the connector always connects to the database using this service name.
- Oracle username: The name of the Oracle database user.
- Oracle password: The password for the Oracle database user.
- Trust store: The trust store file containing server CA certificate. Only required if using SSL to connect to the database.
- Trust store password: The trust store password for the trust store. Only required when using SSL to connect to the database.
- Enable Oracle FAN events: Whether the connection should allow using Oracle RAC Fast Application Notification (FAN) events. This is disabled by default, meaning FAN events are not used even if they are supported by the database. This should only be enabled when using Oracle RAC set up with FAN events. Enabling this feature may cause connection issues when the database is not set up to use FAN events.
- Click Continue.
Add the following details:
- Table inclusion regex: The regular expression that matches the
fully-qualified names of the tables including SID or PDB and schema.
For example, if you use a non-container database, an example
expression could be
ORCLCDB[.]C##MYUSER[.](table1|orders|customers)
orORCLCDB.C##MYUSER.(table1|orders|customers)
. If you use a multi-tenant database, the example expression could beORCLPDB1[.]C##MYUSER[.](table1|orders|customers)
orORCLPDB1.C##MYUSER.(table1|orders|customers)
. - Topic name template: The template that defines the name of the
Kafka topic where the connector writes a change event. This property
is blank by default, which indicates that the connector does not
produce change event records and only writes records to the redo log
topic. This property can be a constant (topic name) if the connector
writes all records from all captured tables to one topic. Or, the
value can include any supported template variables, including
${databaseName}
,${schemaName}
,${tableName}
, and${connectorName}
. - Output Kafka record key format: Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Output Kafka record value format: Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
Show advanced configurations
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
Table exclusion regex: The regular expression that matches the fully-qualified names of the tables including SID or PDB and scheme.
Start from: What the connector should do when it starts for the first time. The value is either the literal
snapshot
(the default), the literalcurrent
, the literalforce_current
, an Oracle System Change Number (SCN), or a database timestamp in the formDD-MON-YYYY HH24:MI:SS
.Dictionary mode: The dictionary handling mode used by the connector. Options are
auto
,online
, orredo_log
. Defaults toauto
.Redo log corruption topic: The name of the Kafka topic where the connector records events that describe the information about corruption in the Oracle redo log, and which identifies missed data.
Redo log topic: The name of the Kafka topic where the connector records all raw redo log events. If left empty, this property defaults to
${connectorName}-${databaseName}-redo-log
. For example,lcc-mycdcconnector-myoracledb-redo-log
.Key template: The template that defines the Kafka record key for each change event. By default the key contains a struct with the primary key fields, or null if the table has no primary or unique key.
Output table name field: The name of the field in the change record written to Kafka that contains the schema-qualified name of the affected Oracle table (for example,
dbo.Customers
). A blank value indicates that this field should not be included in the change records.Output SCN field: The name of the field in the change record written to Kafka that contains the Oracle system change number (SCN) when this change was made. Leaving this empty indicates that this field should not be included in the change records.
Output operation type field: The name of the field in the change record written to Kafka that contains the operation type for this change event. Leaving this empty indicates that this field should not be included in the change records.
Output operation timestamp field: The name of the field in the change record written to Kafka that contains the operation timestamp for this change event. Leaving this empty indicates that this field should not be included in the change records.
Output current timestamp field: The name of the field in the change record written to Kafka that contains the connector’s timestamp that indicates when the change event was processed. Leaving this empty indicates that this field should not be included in the change records.
Output row ID field: The name of the field in the change record written to Kafka that contains the row ID of the table changed by this event. Leaving this empty indicates that this field should not be included in the change records.
Output username field: The name of the field in the change record written to Kafka that contains the name of the Oracle user who executed the transaction that resulted in this change. Leaving this empty indicates that this field should not be included in the change records.
Output redo field: The name of the field in the change record written to Kafka that contains the original redo DML statement from which this change record was created. Leaving this empty indicates that this field should not be included in the change records.
Output undo field: The name of the field in the change record written to Kafka that contains the original undo DML statement that effectively undoes this change, and which represents the “before” state of the row. Leaving this empty indicates that this field should not be included in the change records.
Output operation type read value: The value of the operation type for a read (snapshot) change event. By default this property value is set to
R
.Output operation type insert value: The value of the operation type for an insert change event. By default this property value is set to
I
.Output operation type update value: The value of the operation type for an update change event. By default this property value is set to
U
.Output operation type delete value: The value of the operation type for a delete change event. By default this property value is set to
D
.Output operation type truncate value: The value of the operation type for a truncate change event. By default this property value is set to
T
.LOB topic template: The template that defines the name of the Kafka topic where the connector writes LOB objects. The value can be a constant if all LOB objects from all captured tables will be written to one topic, or the value can include any supported template variables, including
${columnName}
,${databaseName}
,${schemaName}
,${tableName}
,${connectorName}
, and so on.Snapshot by partitions: Whether the connector should perform snapshots on each table partition if the table is defined to use partitions. By default, this is set to
false
, meaning that one snapshot is performed on each table in its entirety.Snapshot threads per task: The number of threads that can be used in each task to perform snapshots. This is only used in each task where the value is larger than the number of tables assigned to that task. Defaults to
4
.Enable large LOB object support: If true, the connector will support large LOB objects that are split across multiple redo log records. The connector emits commit messages to the redo log topic and uses these commit messages to track when a large LOB object can be emitted to the LOB topic.
Map numeric values…: Map NUMERIC values by precision and optionally scale to primitive or decimal types. The
none
option is the default, but may lead to serialization issues since the Connect DECIMAL type is mapped to its binary representation. One of thebest_fit_or...
options are generally preferred. The following options can be used:Numeric default scale: The default scale to use for numeric types when the scale cannot be determined.
Map Oracle DATE type to Connect data type: Map Oracle DATE values to Connect types.
Transforms and Predicates: See the Single Message Transforms (SMT) documentation for details.
For all property values and definitions, see Configuration Properties.
- Table inclusion regex: The regular expression that matches the
fully-qualified names of the tables including SID or PDB and schema.
For example, if you use a non-container database, an example
expression could be
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of tasks, use the Range Slider to select the desired number of tasks.
- Click Continue.
Verify the connection details by previewing the running configuration.
Tip
For information about previewing your connector output, see Confluent Cloud Connector Data Previews.
After you’ve validated that the properties are configured to your satisfaction, click Launch.
The status for the connector should go from Provisioning to Running.
Important
- If the connector is not running, see Oracle Database Prerequisites for Oracle CDC Source Connector for Confluent Cloud and review the Oracle database configuration information and post-configuration validation steps.
- See Troubleshooting Oracle CDC Source Connector for Confluent Cloud for additional information.
Step 5: Check the Kafka topic¶
After the connector is running, verify that records are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.
See also
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.
Using the Confluent CLI¶
Complete the following steps to set up and run the connector using the Confluent CLI.
Note
- Make sure you have all your prerequisites completed.
- The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.
Step 1: List the available connectors¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
For example:
confluent connect plugin describe OracleCdcSource
Example output:
The following are required configs:
connector.class : OracleCdcSource
name
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
oracle.server
oracle.sid
oracle.username
table.inclusion.regex
Step 3: Create the connector configuration file¶
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"name": "OracleCdcSourceConnector_0",
"config": {
"connector.class": "OracleCdcSource",
"name": "OracleCdcSourceConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**************************************************",
"oracle.server": "database-5.abcdefg12345.us-west-2.rds.amazonaws.com",
"oracle.port": "1521",
"oracle.sid": "ORCL",
"oracle.username": "admin",
"oracle.password": "**********",
"table.inclusion.regex": "ORCL[.]ADMIN[.]CUSTOMERS",
"start.from": "SNAPSHOT",
"query.timeout.ms": "60000",
"redo.log.row.fetch.size": "1",
"table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
"lob.topic.name.template":"${databaseName}.${schemaName}.${tableName}.${columnName}",
"numeric.mapping": "BEST_FIT_OR_DOUBLE",
"output.data.key.format": "JSON_SR",
"output.data.value.format": "JSON_SR",
"tasks.max": "2"
}
}
Note the following property definitions:
"name"
: Sets a name for your new connector."connector.class"
: Identifies the connector plugin name.
"kafka.auth.mode"
: Identifies the connector authentication mode you want to use. There are two options:SERVICE_ACCOUNT
orKAFKA_API_KEY
(the default). To use an API key and secret, specify the configuration propertieskafka.api.key
andkafka.api.secret
, as shown in the example configuration (above). To use a service account, specify the Resource ID in the propertykafka.service.account.id=<service-account-resource-ID>
. To list the available service account resource IDs, use the following command:confluent iam service-account list
For example:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"table.inclusion.regex"
: The regular expression that matches the fully-qualified names of the tables including SID or PDB and schema. For example, if you use a non-container database, in addition to the example shown in the configuration above, an example expression could beORCLCDB[.]C##MYUSER[.](table1|orders|customers)
orORCLCDB.C##MYUSER.(table1|orders|customers)
. If you use a multi-tenant database, the example expression could beORCLPDB1[.]C##MYUSER[.](table1|orders|customers)
orORCLPDB1.C##MYUSER.(table1|orders|customers)
."start.from"
: What the connector should do when it starts for the first time. The value is either the literalsnapshot
(the default), the literalcurrent
, the literalforce_current
, an Oracle System Change Number (SCN), or a database timestamp in the formDD-MON-YYYY HH24:MI:SS
. Thesnapshot
literal instructs the connector to snapshot captured tables the first time it is started, then continue processing redo log events from the point in time when the snapshot was taken. Thecurrent
literal instructs the connector to start from the current Oracle SCN without snapshotting. Theforce_current
literal is the same ascurrent
, but ignores any previously stored offsets when the connector is restarted. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs."query.timeout.ms"
: The timeout in milliseconds for any query submitted to Oracle. The default is five minutes (or300000
milliseconds). If set to a negative value, the connector does not enforce timeout on queries."redo.log.row.fetch.size"
: The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. The default is5000
rows. Use0
to disable this hint."table.topic.name.template"
: The template that defines the name of the Kafka topic where the connector writes a change event. The value can be a constant if the connector writes all records from all captured tables to one topic. Or, the value can include any supported template variables, including${databaseName}
,${schemaName}
,${tableName}
,${connectorName}
, and so on. The default is an empty string, which indicates that the connector is not producing change event records. Special-meaning characters\
,$
,{
, and}
must be escaped with\
when not intended to be part of a template variable. You can leave this property empty only when a redo log topic is specified. If left empty, the connector only writes to the redo log topic."numeric.mapping"
: Map NUMERIC values by precision and optionally scale to primitive or decimal types. Thenone
option is the default, but may lead to serialization issues since the Connect DECIMAL type is mapped to its binary representation. One of thebest_fit_or...
options are generally preferred. The following options can be used:- Use
none
if you want all NUMERIC columns to be represented by the Connect DECIMAL logical type. - Use
best_fit_or_decimal
if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect DECIMAL logical type is used instead. - Use
best_fit_or_double
if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect FLOAT64 type is used instead. - Use
best_fit_or_string
if NUMERIC columns should be cast to the Connect primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, the Connect STRING type is used instead. - Use
precision_only
to map NUMERIC columns based only on the column’s precision, assuming the column’s scale is 0. - For backward compatibility, the
best_fit
option is also available. It behaves the same asbest_fit_or_decimal
.
- Use
"output.data.key.format"
: Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF."output.data.value.format"
: Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF."tasks.max"
: Sets the maximum number of tasks in use by the connector. More tasks may improve performance. Defaults to2
tasks.
Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs. For a few example SMTs, see SMT Examples for Oracle CDC Source Connector for Confluent Cloud.
See Configuration Properties for all properties and definitions.
Step 4: Load the properties file and create the connector¶
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file oracle-cdc-source.json
Example output:
Created connector OracleCdcSource_0 lcc-ix4dl
Step 5: Check the connector status¶
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type
+-----------+-----------------------+---------+-------+
lcc-ix4dl | OracleCdcSource_0 | RUNNING | source
Step 6: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
Important
- If the connector is not running, see Oracle Database Prerequisites for Oracle CDC Source Connector for Confluent Cloud and review the Oracle database configuration information and post-configuration validation steps.
- See Troubleshooting Oracle CDC Source Connector for Confluent Cloud for additional information.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.
Configuration Properties¶
Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
Schema Config¶
schema.context.name
Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
- Type: string
- Default: default
- Importance: medium
How should we connect to your database?¶
oracle.server
The hostname or address for the Oracle server.
- Type: string
- Valid Values: Must match the regex
^[^\?=%&\(\)]*$
- Importance: high
oracle.port
The port number used to connect to Oracle.
- Type: int
- Default: 1521
- Valid Values: [1,…,65535]
- Importance: high
oracle.sid
The Oracle system identifier (SID) of a multi-tenant container database (CDB) or non-multitenant database where tables reside. Confluent recommends you use
oracle.service.name
to connect to the database using service names instead of using the SID. Maps to the SID parameter in the connect descriptor.- Type: string
- Valid Values: Must match the regex
^[a-zA-Z][a-zA-Z0-9$#_]*$
- Importance: high
oracle.pdb.name
The name of the pluggable database (PDB). This is not required when tables reside in the CDB$ROOT database, or if you’re using a non-container database.
- Type: string
- Valid Values: Must match the regex
^([a-zA-Z][a-zA-Z0-9$#_]*)*$
- Importance: high
oracle.service.name
The Oracle service name. If set, the connector always connects to the database using the provided service name. The
oracle.service.name
maps to the SERVICE_NAME parameter in the connect descriptor. For the multitenant container database (CDB) or non-multitenant database, this does not need to be specified. Confluent recommends you set theoracle.service.name
to the container database (CDB) service name when using a pluggable database (PDB). When this property is set, it is used in the connect descriptor instead oforacle.sid
.- Type: string
- Valid Values: Must match the regex
^([a-zA-Z][a-zA-Z0-9$#_.]*)*$
- Importance: low
oracle.username
The name of the Oracle database user.
- Type: string
- Valid Values: Must match the regex
^[^\?=%&\(\)]*$
- Importance: high
oracle.password
The password for the Oracle database user.
- Type: password
- Importance: high
ssl.truststorefile
The trust store containing server CA certificate. Only required when using SSL to connect to the database.
- Type: password
- Default: [hidden]
- Importance: low
ssl.truststorepassword
The trust store password containing server CA certificate. Only required when using SSL to connect to the database.
- Type: password
- Default: [hidden]
- Importance: low
oracle.fan.events.enable
Whether the connection should allow using Oracle RAC Fast Application Notification (FAN) events. This is disabled by default, meaning FAN events will not be used even if they are supported by the database. This should only be enabled when using Oracle RAC set up with FAN events. Enabling this feature may cause connection issues when the database is not set up to use FAN events.
- Type: boolean
- Default: false
- Importance: low
Database details¶
table.inclusion.regex
The regular expression that matches the fully-qualified table names. The values are matched (case sensitive) with the object names stored in the data dictionary (Uppercase unless created as a quoted identifier. Database and PDB names are always stored as uppercase in the data dictionary). Ensure consistent casing for the sid part in the identifier with the
oracle.sid
value specified. For non-container database, the fully-qualified name includes the SID and schema name. e.g.ORCLDB[.]MYUSER[.](ORDERS|CUSTOMERS)
orORCLDB\.MYUSER\.(ORDERS|CUSTOMERS)
. For multitenant database (CDB), the fully-qualified name includes the SID and schema name. e.g.ORCLCDB[.]C##MYUSER[.](ORDERS|CUSTOMERS)
orORCLCDB\.C##MYUSER\.(ORDERS|CUSTOMERS)
. For multitenant database (PDB), the fully-qualified name includes the PDB and schema name. e.g.ORCLPDB1[.]C##MYUSER[.](ORDERS|CUSTOMERS)
orORCLPDB1\.C##MYUSER\.(ORDERS|CUSTOMERS)
.- Type: string
- Importance: high
table.exclusion.regex
The regular expression that matches the fully-qualified table names. The values are matched (case sensitive) with the object names stored in the data dictionary (Uppercase unless created as a quoted identifier. Database and PDB names are always stored as uppercase in the data dictionary). Ensure consistent casing for the sid part in the identifier with the
oracle.sid
value specified. For non-container database, the fully-qualified name includes the SID and schema name. e.g.ORCLDB[.]MYUSER[.](ORDERS|CUSTOMERS)
orORCLDB\.MYUSER\.(ORDERS|CUSTOMERS)
. For multitenant database (CDB), the fully-qualified name includes the SID and schema name. e.g.ORCLCDB[.]C##MYUSER[.](ORDERS|CUSTOMERS)
orORCLCDB\.C##MYUSER\.(ORDERS|CUSTOMERS)
. For multitenant database (PDB), the fully-qualified name includes the PDB and schema name. e.g.ORCLPDB1[.]C##MYUSER[.](ORDERS|CUSTOMERS)
orORCLPDB1\.C##MYUSER\.(ORDERS|CUSTOMERS)
.- Type: string
- Default: “”
- Importance: high
oracle.supplemental.log.level
Database supplemental logging level for connector operation. If set to
full
, the connector validates that the supplemental logging level on the database is FULL and then captures Snapshots and CDC events for the specified tables whenevertable.topic.name.template
is not set to""
. When the level is set tomsl
, the connector does not capture the CDC change events; rather, it only captures snapshots iftable.topic.name.template
is not set to""
. Note that this setting is irrelevant if thetable.topic.name.template
is set to""
. In this case, only redo logs are captured. This setting defaults tofull
supplemental logging level mode.- Type: string
- Default: full
- Valid Values: full, msl
- Importance: medium
start.from
When starting for the first time, this is the position in the redo log where the connector should start. Specifies an Oracle System Change Number (SCN) or a database timestamp with the format
yyyy-MM-dd HH:mm:SS
in the database time zone. Defaults to the literalsnapshot
, which instructs the connector to perform an initial snapshot of each captured table before capturing changes. The literalcurrent
may instruct the connector to start from the current Oracle SCN without snapshotting. Theforce_current
literal is the same ascurrent
, but it will ignore any previously stored offsets when the connector is restarted. This option should be used cautiously as it can result in losing changes between the SCN stored in the offsets and the current SCN. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs. Every option other thanforce_current
causes the connector to resume from the stored offsets in case of task restarts or re-balances.- Type: string
- Default: snapshot
- Importance: medium
Connector details¶
emit.tombstone.on.delete
If true, delete operations emit a tombstone record with null value.
- Type: boolean
- Default: false
- Importance: low
behavior.on.dictionary.mismatch
Specifies the desired behavior when the connector is not able to parse the value of a column due to a dictionary mismatch caused by DDL statement. This can happen if the
online
dictionary mode is specified but the connector is streaming historical data recorded before DDL changes occurred. The default optionfail
will cause the connector task to fail. Thelog
option will log the unparsable record and skip the problematic record without failing the connector task.- Type: string
- Default: fail
- Valid Values: fail, log
- Importance: low
behavior.on.unparsable.statement
Specifies the desired behavior when the connector encounters a SQL statement that could not be parsed. The default option
fail
will cause the connector task to fail. Thelog
option will log the unparsable statement and skip the problematic record without failing the connector task.- Type: string
- Default: fail
- Valid Values: fail, log
- Importance: low
db.timezone
Default timezone to assume when parsing Oracle
DATE
andTIMESTAMP
types for which timezone info is not available. For example, ifdb.timezone=UTC
, data for bothDATE
andTIMESTAMP
is parsed as if in UTC timezone. The value has to be a valid java.util.TimeZone ID.- Type: string
- Default: UTC
- Importance: low
db.timezone.date
The default timezone to assume when parsing Oracle
DATE
type for which timezone information is not available. Ifdb.timezone.date
is set, the value ofdb.timezone
forDATE
type will be overwritten with the value indb.timezone.date
. For example, ifdb.timezone=UTC
anddb.timezone.date=America/Los_Angeles
, the dataTIMESTAMP
will be parsed as if it is in UTC timezone, and the data inDATE
will be parsed as if in America/Los_Angeles timezone. The value has to be a validjava.util.TimeZone
ID.- Type: string
- Importance: low
redo.log.startup.polling.limit.ms
The amount of time to wait for the redo log to be present on connector startup. This is only relevant when connector is configured to capture change events. On expiration of this wait time, the connector will move to a failed state.
- Type: long
- Default: 300000 (5 minutes)
- Valid Values: [0,…,3600000]
- Importance: low
heartbeat.interval.ms
The interval in milliseconds after which the connector would emit heartbeat records to heartbeat topic with the name
${connectorName}-${databaseName}-heartbeat-topic
. Heartbeats are useful for moving the connector offsets and ensuring we are always up to the latest SCN we processed. The default is 0 milliseconds which disables the heartbeat mechanism. Confluent recommends that you set the heartbeat.interval.ms parameter to a value in the order of minutes to hours in environments where the connector is configured to capture infrequently updated tables so the source offsets can move forward. Otherwise, a task restart could cause the connector to fail with an ORA-01291 missing logfile error if the archived redo log file corresponding to the stored source offset has been purged from the database.- Type: long
- Default: 0
- Valid Values: [0,…]
- Importance: low
log.mining.end.scn.deviation.ms
Calculates the end SCN of log mining sessions as the approximate SCN that corresponds to the point in time that is log.mining.end.scn.deviation.ms milliseconds before the current SCN obtained from the database. The default value is set to 3 seconds on RAC environments, and 0 seconds on non RAC environments. This configuration is applicable only for Oracle database versions 19c and later. Setting this configuration to a lower value on a RAC environment introduces the potential for data loss at high load. A higher value increases the end to end latency for change events.
- Type: long
- Default: 0
- Valid Values: [0,…,60000]
- Importance: medium
log.mining.archive.destination.name
The name of the archive log destination to use when mining archived redo logs. For example, when configured with
LOG_ARCHIVE_DEST_2
, the connector exclusively refers to the second destination for retrieving archived redo logs. This is only applicable for Oracle database versions 19c and later.- Type: string
- Default: “”
- Importance: low
use.transaction.begin.for.mining.session
Set start SCN for log mining session to the start SCN of the oldest relevant open transaction if one exists. A relevant transaction is defined as one that has changes to tables that the connector is setup to capture. It is recommended to set this to true when connecting to Oracle Real Application Clusters (RAC) databases or if large object datatypes (LOB) support is enabled (using
enable.large.lob.object.support
). This configuration is applicable only for Oracle database versions 19c and later.- Type: boolean
- Default: false
- Importance: medium
log.mining.transaction.age.threshold.ms
Specifies the threshold (in milliseconds) for transaction age. Transaction age is defined as the duration the transaction has been open on the database. If the transaction age exceeds this threshold then an action is taken depending on the value set for the
log.mining.transaction.threshold.breached.action
configuration. The default value is -1 which means that a transaction is retained in the buffer until the connector receives the commit or rollback event for the transaction. This configuration is applicable only whenuse.transaction.begin.for.mining.session
is set totrue
.- Type: long
- Default: -1
- Importance: medium
log.mining.transaction.threshold.breached.action
Specifies the action to take when an active transaction exceeds the threshold defined using the
log.mining.transaction.age.threshold.ms
configuration. When set todiscard
, the connector drops long running transactions that exceed the threshold age from the buffer and skip emitting any records associated with these transactions. Withwarn
the connector logs a warning, mentioning the oldest transaction that exceed the threshold.- Type: string
- Default: warn
- Valid Values: discard, warn
- Importance: medium
Connection details¶
query.timeout.ms
The timeout in milliseconds for any query submitted to Oracle. The default is 5 minutes (or 300000 milliseconds). If set to negative values, then the connector will not enforce timeout on queries.
- Type: long
- Default: 300000 (5 minutes)
- Importance: medium
max.batch.size
The maximum number of records that will be returned by the connector to Connect. The connector may still return fewer records if no additional records are available.
- Type: int
- Default: 1000
- Valid Values: [100,…,10000]
- Importance: medium
poll.linger.ms
The maximum time to wait for a record before returning an empty batch. The call to poll can return early before
poll.linger.ms
expires ifmax.batch.size
records are received.- Type: long
- Default: 5000 (5 seconds)
- Valid Values: [0,…,300000]
- Importance: medium
max.buffer.size
The maximum number of records from all snapshot threads and from the redo log that can be buffered into batches. The default of 0 means a buffer size will be computed from the maximum batch size and number of threads.
- Type: int
- Default: 0
- Valid Values: [0,…,10000]
- Importance: low
redo.log.poll.interval.ms
The interval between polls to retrieve the database redo log events. This has no effect when using Oracle database versions prior to 19c.
- Type: long
- Default: 500
- Valid Values: [500,…,60000]
- Importance: medium
snapshot.row.fetch.size
The number of rows to provide as a hint to the JDBC driver when fetching table rows in a snapshot. A value of 0 disables this hint.
- Type: int
- Default: 2000
- Valid Values: [0,…,10000]
- Importance: medium
redo.log.row.fetch.size
The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. A value of 0 disables this hint. When continuous mine is available (database versions before Oracle 19c), the mining query from the connector waits until the number of rows available from the redo log is at least the value specified for fetch size before returning the results.
- Type: int
- Default: 5000
- Valid Values: [0,…,10000]
- Importance: medium
redo.log.row.poll.fields.include
A comma-separated list of fields from the V$LOGMNR_CONTENTS view to include in the redo log events.
- Type: list
- Default: “”
- Importance: low
redo.log.row.poll.fields.exclude
A comma-separated list of fields from the V$LOGMNR_CONTENTS view to exclude in the redo log events.
- Type: list
- Default: “”
- Importance: low
redo.log.row.poll.username.exclude
A comma-separated list of database usernames. When this property is set, the connector captures changes only from database users that are not specified in this list. You cannot set this property along with the
redo.log.row.poll.username.include
property- Type: list
- Default: “”
- Importance: low
redo.log.row.poll.username.include
A comma-separated list of database usernames. When this property is set, the connector captures changes only from the specified set of database users. You cannot set this property along with the
redo.log.row.poll.username.exclude
property- Type: list
- Default: “”
- Importance: low
oracle.validation.result.fetch.size
The fetch size to be used while querying database for validations. This will be used to query list of tables and supplemental logging level validation.
- Type: int
- Default: 5000
- Importance: low
Output messages¶
table.topic.name.template
The template that defines the name of the Kafka topic where the change event is written. The value can be a constant if the connector writes all change events from all captured tables to one topic. The value can include any supported template variables, including
${databaseName}
,${schemaName}
,${tableName}
,${connectorName}
This can be left blank only if the connector has to write events only to the redo log topic and not to the table change event topics. Special characters, including\
,$
,{
, and}
must be escaped with\
when not intended to be part of a template variable.- Type: string
- Default: ${databaseName}.${schemaName}.${tableName}
- Importance: high
redo.log.corruption.topic
The name of the Kafka topic to which the connector will record events that describe the information about corruption in the Oracle redo log, and which signify missed data. This can optionally use the template variables
${connectorName}
,${databaseName}
, and${schemaName}
. A blank topic name (the default) signals that this information should not be written to Kafka.- Type: string
- Default: “”
- Importance: high
redo.log.topic.name
The template for the name of the Kafka topic to which the connector will record all raw redo log events. This can optionally use the template variables
${connectorName}
,${databaseName}
, and${schemaName}
.- Type: string
- Default: ${connectorName}-${databaseName}-redo-log
- Importance: high
key.template
The template that defines the Kafka record key for each change event. By default, the record key contains a concatenated primary key value delimited by an underscore (
_
) character. Use${primaryKeyStructOrValue}
to contain either the sole column value of a single-column primary key or a STRUCT with the multi-column primary key fields (or null if the table has no primary or unique key). Use${primaryKeyStruct}
to always use a STRUCT for primary keys that have one or more columns (or null if there is no primary or unique key). If the template contains variables or string literals, the record key is the string with the variables resolved and replaced.- Type: string
- Default: ${primaryKeyStructOrValue}
- Importance: medium
oracle.dictionary.mode
The dictionary handling mode used by the connector. Options are
auto
,online
, orredo_log
.auto
: The connector uses the dictionary from the online catalog until a DDL statement to evolve the table schema is encountered. At this point, the connector starts using the dictionary from archived redo logs. Once the DDL statement has been processed, the connector reverts to using the online catalog. Use this mode if DDL statements are expected.online
: The connector always uses the online dictionary catalog. Use this mode if no DDL statements are expected.redo_log
: The connector always uses the dictionary catalog from archived redo logs. Use this mode if you cannot access the online redo log. Note that any CDC events will be delayed until they are archived from online logs before the connector processes them.- Type: string
- Default: auto
- Valid Values: auto, online, redo_log
- Importance: low
output.table.name.field
The name of the field in the change record written to Kafka that contains the fully-qualified name of the affected Oracle table. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the fully-qualified name of the affected Oracle table as a header with the given name.- Type: string
- Default: table
- Importance: low
output.commit.scn.field
The name of the field in the change record written to Kafka that contains the Oracle system change number (SCN) when the transaction was committed. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the Oracle system change number (SCN) when the transaction committed as a header with the given name.- Type: string
- Default: “”
- Importance: low
output.scn.field
The name of the field in the change record written to Kafka that contains the Oracle system change number (SCN) when this change was made. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the Oracle system change number (SCN) as a header with the given name.- Type: string
- Default: scn
- Importance: low
output.before.state.field
The name of the field in the change record written to Kafka that contains the before state of changed database rows for an update operation. A blank value signals that this field should not be included in the change records.
- Type: string
- Default: “”
- Importance: low
output.op.type.field
The name of the field in the change record written to Kafka that contains the operation type for this change event. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the operation type as a header with the given name.- Type: string
- Default: op_type
- Importance: low
output.op.ts.field
The name of the field in the change record written to Kafka that contains the operation timestamp for this change event. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the operation timestamp as a header with the given name.- Type: string
- Default: op_ts
- Importance: low
output.current.ts.field
The name of the field in the change record written to Kafka that contains the connector’s timestamp when this change event was processed. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the connector’s timestamp when this change event was processed as a header with the given name.- Type: string
- Default: current_ts
- Importance: low
output.row.id.field
The name of the field in the change record written to Kafka that contains the row ID of the table changed by this event. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the row ID of the table changed by this event as a header with the given name.- Type: string
- Default: row_id
- Importance: low
output.username.field
The name of the field in the change record written to Kafka that contains the name of the Oracle user that executed the transaction that resulted in this change. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the name of the Oracle user that executed the transaction that resulted in this change as a header with the given name.- Type: string
- Default: username
- Importance: low
output.redo.field
The name of the field in the change record written to Kafka that contains the original redo DML statement from which this change record was created. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the original redo DML statement from which this change record was created as a header with the given name.- Type: string
- Default: “”
- Importance: low
output.undo.field
The name of the field in the change record written to Kafka that contains the original undo DML statement that effectively undoes this change and represents the “before” state of the row. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the original undo DML statement that effectively undoes this change and represents the “before” state of the row as a header with the given name.- Type: string
- Default: “”
- Importance: low
output.op.type.read.value
The value of the operation type for a read (snapshot) change event. By default this is “R”.
- Type: string
- Default: R
- Importance: low
output.op.type.insert.value
The value of the operation type for an insert change event. By default this is “I”.
- Type: string
- Default: I
- Importance: low
output.op.type.update.value
The value of the operation type for an update change event. By default this is “U”.
- Type: string
- Default: U
- Importance: low
output.op.type.delete.value
The value of the operation type for a delete change event. By default this is “D”.
- Type: string
- Default: D
- Importance: low
lob.topic.name.template
The template that defines the name of the Kafka topic to which LOB objects should be written. The value can be a constant if all LOB objects from all captured tables are to be written to one topic, or the value can include any supported template variables, including
${columnName}
,${databaseName}
,${schemaName}
,${tableName}
,${connectorName}
, etc. The default is empty, which will ignore all LOB type columns if any exist on captured tables. Special-meaning characters\
,$
,{
, and}
must be escaped with\
when not intended to be part of a template variable. Any character that is not a valid character for topic name is replaced by an underscore in the topic name.- Type: string
- Default: “”
- Importance: low
snapshot.by.table.partitions
Whether the connector should perform snapshots on each table partition if the table is defined to use partitions. This is
false
by default, meaning that one snapshot is performed on each table in its entirety.- Type: boolean
- Default: false
- Importance: low
snapshot.threads.per.task
The number of threads that can be used in each task to perform snapshots. This is only useful for a task if the value of the number of tables assigned to that task is more than this.
- Type: int
- Default: 4
- Valid Values: [1,…,10]
- Importance: low
enable.large.lob.object.support
If true, the connector will support large LOB objects that are split across multiple redo log records. The connector will emit commit messages to the redo log topic and use these commit messages to track when a large LOB object can be emitted to the LOB topic.
- Type: boolean
- Default: false
- Importance: low
numeric.mapping
Map NUMERIC values by precision and optionally scale to primitive or decimal types. Use
none
if all NUMERIC columns are to be represented by Connect’s DECIMAL logical type. Usebest_fit_or_decimal
if NUMERIC columns should be cast to Connect’s primitive type based on the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s DECIMAL logical type will be used instead, and the values will be represented in binary form within the change events. Usebest_fit_or_double
if NUMERIC columns should be cast to Connect’s primitive type based on the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s FLOAT64 type will be used instead. Usebest_fit_or_string
if NUMERIC columns should be cast to Connect’s primitive type based on the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s STRING type will be used instead. Useprecision_only
to map NUMERIC columns based only on the column’s precision, assuming the column’s scale is 0. Thenone
option is the default but may lead to serialization issues since Connect’s DECIMAL type is mapped to its binary representation. One of thebest_fit_or
options will often be preferred. For backward compatibility reasons, thebest_fit
option is also available. It behaves the same asbest_fit_or_decimal
. This would require deletion of the table topic and the registered schemas if using non-JSONvalue.converter
.- Type: string
- Default: none
- Valid Values: best_fit, best_fit_or_decimal, best_fit_or_double, best_fit_or_string, none, precision_only
- Importance: low
numeric.default.scale
The default scale to use for numeric types when the scale cannot be determined.
- Type: int
- Default: 127
- Valid Values: [-127,…,127]
- Importance: low
oracle.date.mapping
Map Oracle DATE values to Connect types. Use
date
if all DATE columns are to be represented by Connect’s Date logical type. Usetimestamp
if DATE columns should be cast to Connect’s Timestamp. Despite the name similarity, Oracle DATE type has different semantics than Connect Date.timestamp
will often be preferred for semantic similarity.- Type: string
- Default: timestamp
- Valid Values: date, timestamp
- Importance: low
output.data.key.format
Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Default: JSON
- Valid Values: AVRO, JSON, JSON_SR, PROTOBUF, STRING
- Importance: high
output.data.value.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Default: JSON
- Valid Values: AVRO, JSON, JSON_SR, PROTOBUF, STRING
- Importance: high
Number of tasks for this connector¶
tasks.max
Maximum number of tasks to use for this connector.
- Type: int
- Default: 2
- Valid Values: [1,…,1000]
- Importance: high
Template variables¶
The connector uses template variables to create the name of the Kafka topic and the record key for each of the change events. The variables are similar to the Oracle GoldenGate Kafka Connect template variables which simplify migrating from Oracle GoldenGate to this connector. Variables are resolved at the task level and table level.
Connector and task variables¶
Variable keyword | Description |
---|---|
${connectorName} | Resolves to the name of the connector. |
${databaseName} | Resolves to the database name. |
${emptyString} | Resolves to an empty string. |
${staticMap[]} | Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square braces, in the following format: ${staticMap[dbo.table1=value1,dbo.table2=value2]} . |
${currentTimestamp} or ${currentTimestamp[]} | Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate} , ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
Table variables¶
Variable keyword | Description |
---|---|
${schemaName} | Resolves to the schema name for the table. |
${tableName} | Resolves to the short table name. |
${fullyQualifiedTableName} | Resolves to the fully-qualified table name including the period (.) delimiter between the schema and table names. For example, dbo.table1 . |
Column variables¶
Variable keyword | Description |
---|---|
${columnName} | Resolves to the column name. |
Record variables¶
Variable keyword | Description |
---|---|
${opType} | Resolves to the type of the operation: READ, INSERT, UPDATE, DELETE, or TRUNCATE. |
${opTimestamp} | Resolves to the operation timestamp from the redo log. |
${rowId} | Resolves to the ID of the changed row. |
${primaryKey} | Resolves to the concatenated primary key values delimited by an underscore (_ ) character. |
${primaryKeyStruct} | Resolves to a STRUCT with fields for each of the primary key column values. |
${primaryKeyStructOrValue} | Resolves to either a STRUCT with fields for each of the 2+ primary key column values, or the column value if the primary key contains a single column. |
${scn} | Resolves to the system change number (SCN) when the change was made. |
${cscn} | Resolves to the system change number (SCN) when the change was committed. |
${rbaseq} | Resolves to the sequence number associated with the Redo Block Address (RBA) of the redo record associated with the change. |
${rbablk} | Resolves to the RBA block number within the log file. |
${rbabyte} | Resolves to the RBA byte offset within the block. |
${currentTimestamp} or ${currentTimestamp[]} | Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate} , ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
Supported Data Types¶
The following table lists data types and the associated Connect mapping.
Oracle data type | SQL type code | Connect mapping |
---|---|---|
CHAR or CHARACTER | 1 | STRING |
LONG | 1 | STRING |
VARCHAR | 12 | STRING |
VARCHAR | 12 | STRING |
NCHAR | -15 | STRING |
NVARCHAR2 | -9 | STRING |
RAW | -3 | BYTES |
LONG RAW | -1 | BYTES |
INT or INTEGER | 2 | DECIMAL |
SMALLINT | 2 | DECIMAL |
DEC or DECIMAL | 2 | DECIMAL |
NUMBER | 2 | DECIMAL |
NUMERIC | 2 | DECIMAL |
DOUBLE PRECISION | 6 | FLOAT64 |
FLOAT | 6 | FLOAT64 |
REAL | 6 | FLOAT64 |
TIMESTAMP WITH TIMEZONE | -101 | TIMESTAMP |
TIMESTAMP WITH LOCAL TIME ZONE | -102 | TIMESTAMP |
DATE | 91 | DATE |
BLOB | 2004 | BYTES |
CLOB | 2005 | BYTES |
NCLOB | 2011 | BYTES |
XMLTYPE | 2009 | BYTES |
Note
The -101
and -102
codes for TIMESTAMP WITH TIMEZONE
and
TIMESTAMP WITH LOCAL TIMEZONE
are Oracle-specific. BLOB, CLOB, NCLOB, and
XMLTYPE are handled out-of-band with a separate LOB topic.
Troubleshooting and Tips¶
Note the following information.
Pre-19c database¶
Consider decreasing query.timeout.ms
if the tables you are capturing records from do not have much activity.
Degraded condition with Error while polling for records
¶
When this connector is created, you may see the connector is degraded with the following error event.
{
"datacontenttype": "application/json",
"data": {
"level": "ERROR",
"context": {
"connectorId": "lcc-12abc3"
},
"summary": {
"connectorErrorSummary": {
"message": "Error while polling for records",
"rootCause": "Failed to subscribe to the redo log topic 'lcc-22rwg2-ORCL-redo-log' even after waiting PT5M. Verify that this redo log topic exists in the brokers at SASL_SSL://<address>, and that the redo log reading task is able to produce to that topic."
}
}
},
"subject": "lcc-12abc3-llcc-12abc3-1",
"specversion": "1.0",
"id": "a12345bc-6d78-91e0-fg11-c3ac4f20220b",
"source": "crn://confluent.cloud/connector=lcc-12abc3",
"time": "2022-03-28T18:25:29.631Z",
"type": "io.confluent.logevents.connect.TaskFailed"
}
When you have new inserts, updates, or deletes, this error message may self-correct, or you can increase the redo.log.startup.polling.limit.ms
to a larger value for the connector. This will cause the connector to wait for a redo log topic to be created.
Alternatively, you can create a redo-log-topic with one partition and specify it in "redo.log.topic.name" : “redo-log-topic”
when you create the connector.
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.