Oracle CDC Source Connector for Confluent Cloud Overview and Features¶
Oracle 11g, 12c and 18c Deprecation
Oracle discontinued support for the following Oracle Database versions:
- Version 11g on December 31, 2020
- Version 12c on March 31, 2022
- Version 18c on June 30, 2021
Oracle CDC connector support for each of these versions will reach end-of-life on June 30, 2025. Confluent currently supports Oracle Database versions 19c and later.
The fully-managed Oracle CDC Source connector for Confluent Cloud captures each change to rows in a database and then represents the changes as change event records in Apache Kafka® topics.
Redo log topic¶
The connector reads the Oracle database redo log and writes each raw redo log event as a separate Kafka record. The connector queries the V$LOGMNR_CONTENTS view. Each row in the result set that applies to one of the matched tables is converted to records with a field for each column in the result set. The connector will write to this topic using an at-least-once guarantee. The connector may rewrite a portion of the redo log event records upon restart.
To access redo log topics, you must grant the connector a corresponding operation–that is, CREATE, READ, or WRITE in an ACL. For output topics, you must grant the connector either CREATE or WRITE in an ACL. When granted READ, WRITE, or DELETE, the connector implicitly derives the DESCRIBE operation. See Oracle CDC Source connector ACLs for additional information and ACL examples.
Caution
- Redo log topics should not be shared among connectors–this may cause unexpected behavior.
- Currently, the connector only supports writing to the redo log topic with one partition.
- Confluent recommends you increase your log retention policies to 24 hours. If you have a shorter retention policy and your table doesn’t have many activities, the connector might not be able to find a record with the last committed SCN.
- When a connector is in the running state, you may not see a redo log topic
being created (
redo.log.topic.name
). The connector will create the topic when a newINSERT
,UPDATE
, orDELETE
is committed to the tables the connector is configured for. You may also create the topic with one partition before you run the connector.
Redo log corruption topic¶
It is possible that the Oracle redo logs themselves may be corrupted. The connector will not terminate if LogMiner reports corrupted blocks or segments. It is possible to configure the connector to write the corrupted block details to a separate Kafka topic, allowing downstream consumers to use this information to track and react to Oracle redo log file corruptions.
Table change event topics¶
The connector can turn raw logs into change events for each table and write these to Kafka topics using the configured table-to-topic mapping.
Pattern match tables to be captured¶
The connector configuration uses two regular expressions to identify the tables
in the database that it should capture. The connector captures events from all
tables in the database whose fully-qualified names (for example,
<database-name>.<schema-name>.<table-name>
) are matched by the include
expression, unless explicitly excluded by matching the exclude expression.
Flexible mapping of tables to Kafka topics¶
The connector configuration specifies a template that identifies the names of the Kafka topic to which the events are written. This template is resolved into a name for every change event and can use literals or template variables including the schema name, table name, database name, various timestamps, and transaction IDs. This gives users a flexible way of identifying the names of the Kafka topics where the change events are written.
Record keys¶
The records that the connector writes to Kafka topics have (by default) a key
corresponding to the primary key column values for the corresponding row in the
database. If the primary key consists of a single column, the Kafka record’s key
will contain the value of the column for that row. If the primary key consists
of multiple columns, the Kafka record’s key will be a STRUCT containing a field
for each of the primary key’s columns. You can change this behavior by setting
the key.template
configuration property. As with other Connect source
connectors, each record’s key determines the topic partition where the connector
writes the record.
Snapshots¶
When you first start a connector, it attempts to obtain a snapshot of all
existing rows in each table, writing these (as records) to the Kafka topic for
the table, before starting to capture changes made to those rows. This
results in the Kafka topic containing records for every row in the database
table. However, if the Kafka topic should only contain records from a specific
point in time, you can use the start.from
configuration property to specify an SCN or timestamp. This will
set the point where the connector will start capturing events for all tables.
Note
If the connector is interrupted or stopped, or fails while performing a snapshot of any tables upon recovery or restart, the connector restarts all incomplete snapshots from the beginning. Unfortunately, it is currently not possible to resume a snapshot of a table that is changing while ensuring that all changes to that table have been captured.
Table partition snapshots¶
The connector has an advanced feature of table partition snapshots. With this feature, the connector performs snapshots, in parallel, of large tables that are partitioned in Oracle, and distributes these table-partition snapshots across all tasks. This helps to scale the number of tasks linearly, so more snapshots are performed in parallel across a larger number of tasks. For example, a connector can capture and snapshot a single large table (N=1) with many table partitions (for example, P=20) using up to P+1 tasks. This reduces the overall time required to perform the snapshot by scaling out the number of tasks.
Note
When running a connector with snapshot.by.table.partitions=true
, create
table-specific topics ahead of time. If table-specific topics are not created
ahead of time, some tasks assigned to partitioned tables will fail. In
addition, ensure target tables are all partitioned. If you have a mix of
non-partitioned tables and partitioned tables, use
snapshot.by.table.partitions=false
To view the property description, go
to snapshot.by.table.partitions.
Large object types¶
You can configure the connector to capture changes in tables that contain
columns with binary large object (BLOB), character large object (CLOB), and
national character large object (NCLOB) types. These large object (LOB) types are
written to separate LOB topics that can be consumed by downstream applications.
To enable this feature, specify a template variable to use in the
lob.topic.name.template
configuration property (see
Template variables for supported variables). When
enabled, the connector writes LOB objects to a separate topic with the key of
the topic, consisting of the table full name, column name, and primary key of
the change record of the LOB object row.
Note
- A table that contains large object (LOB) type columns must include primary keys.
- LOB objects of more than 1 KB in size are supported. Ensure you set
enable.large.lob.object.support
totrue
. - Be careful when updating the value of primary keys when used in association with LOB topics. When an update to the primary key is processed, the connector will emit the updated record to the change event, but will not retroactively update the LOB record key.
Auto-table set sync and task reconfiguration¶
Tables can be deleted and created in the Oracle database while the connector is running. The connector periodically checks for newly added or recently dropped tables that match the tables to be captured. When the connector identifies new or deleted tables, the connector automatically reconfigures its tasks to stop watching the deleted tables and begin capturing changes.
Scalable database workloads¶
The connector is designed to scale from small to large database workloads
using connector tasks. The connector can be configured to use as few as one
task (tasks.max=1
) or scale to as many tasks as required to capture all
table changes.
Micro-rebalancing of task loads¶
Upon startup, the connector evenly distributes tables across its tasks. The connector monitors throughput variations for each table and the position of each task in the redo log. The connector automatically attempts to distribute the load across all of the connector’s tasks by assigning frequently-changing tables to different tasks.
Automatic creation of Kafka topics¶
You can include rules in your connector configuration that define the topic settings for any topic that the source connector writes to.
Note
When running a connector with snapshot.by.table.partitions=true
, create
table-specific topics ahead of time. If table-specific topics are not created
ahead of time, some tasks assigned to partitioned tables will fail. In
addition, ensure target tables are all partitioned. If you have a mix of
non-partitioned tables and partitioned tables, use
snapshot.by.table.partitions=false
To view the property description, go
to snapshot.by.table.partitions.
Automated reconnection¶
The connector is able to automatically reconnect when the connection to the
database is disrupted or interrupted. When a connection is lost, the connector
stops, logs a disconnection warning or error messages, and attempts to reconnect
using exponential backoff. Once the connection is re-established, the connector
automatically resumes normal operation. A few connection properties control
this behavior, including query.timeout.ms
(defaults to 5 mins).
Oracle multi-tenant CDB/PDB architecture support¶
Oracle provides multitenant architecture support in
Oracle Database 12c and above. System tables are stored in a single container
database (CDB). User tables are stored in pluggable databases (PDBs) plugged
into the CDB. Each instance of our connector can read user tables that reside
in one PDB. The PDB name where user tables reside can be configured using the
property oracle.pdb.name
. To read from system tables in the CDB, or to
read from legacy 11g database, leave the oracle.pdb.name
configuration
property blank. The oracle.sid
property must be set to the Oracle system
identifier (SID) to access either CDB, PDB, or legacy non-multitenant
database.
Manage long-running transactions¶
This section is applicable only when the configuration use.transaction.begin.for.mining.session
is set to true
.
A long running transaction can have a negative impact on the connector performance. It can cause each subsequent mining session to do more work and hence take more time to complete, leading to increased lag in processing change events. It can also impact the memory usage of the connector. The connector stores events corresponding to a transaction in an in-memory buffer until it receives the commit or rollback event for the transaction. A long-running transaction having a large number of changes leads to increased memory usage that could eventually result in an out-of-memory error if there is an insufficient amount of available memory.
As a general guidance, Confluent recommends you avoid long-running transactions, since it can impact the scalability of the system and could lead to deadlocks. It is best to change applications to not have these long-running transactions.
In the case where changes to an application is not feasible, consider using the
log.mining.transaction.age.threshold.ms
and log.mining.transaction.threshold.breached.action
configurations to drop long-running transactions. Use these properties with caution since dropping transactions
could result in potential data loss.
Use the log.mining.transaction.age.threshold.ms
configuration to define a threshold
(in milliseconds) for transaction age. Transaction age is defined as the duration the transaction
has been open on the database. If the transaction age exceeds this threshold then an action is
taken depending on the value set for the log.mining.transaction.threshold.breached.action
configuration. The default value is -1, which means that a transaction is retained in the buffer
until the connector receives the commit or rollback event for the transaction. This should not
be set to a value that is more than the retention period for archived redo log files.
Use the log.mining.transaction.threshold.breached.action
configuration to control the action
to take when an active transaction exceeds the threshold defined using the
log.mining.transaction.age.threshold.ms
configuration. Possible options are: discard
and
warn
(default). Use discard
to drop long running transactions that exceed the threshold age
from the buffer and skip emitting any records associated with these transactions. Use warn
to log
a warning, mentioning the oldest transaction that exceed the threshold.
Archived redo log destination¶
The connector provides a configuration to specify the archived redo log destination
used to read the redo log files. Use the log.mining.archive.destination.name
configuration
to specify the name of the log archive destination to use when mining archived redo logs. You
can configure the connector to use a specific destination using the destination name, for
example, LOG_ARCHIVE_DEST_1.
Before state for update operation¶
For an update operation, the connector under default configuration populates the state of a row after a change
occurs in the change event record. You can configure the connector to include the state of the row before the
change by specifying a non-empty value for the configuration output.before.state.field
. The specified value
will be used as the field name under which the before state would be populated in the change event record.
There are no changes to the change event records for snapshot, insert and delete operations when using this feature.
Note
- If the value converter used is JSON_SR, then the schema evolution is not backward compatible.
- This is not supported for columns with large object (LOB) types.
Manage custom offsets¶
You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
To manage offsets:
- Manage offsets using Confluent Cloud APIs. For more information, see Cluster API reference.
To get the current offset, make a GET
request that specifies the environment, Kafka cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"sidPdb": "ORCLCDB.ORCLPDB1"
},
"offset": {
"scn": "2169287",
"tablePlacement": ""
}
}
],
"metadata": {
"observed_at": "2024-03-28T17:57:48.139635200Z"
}
}
Responses include the following information:
- The position of latest offset.
- The observed time of the offset in the metadata portion of the payload. The
observed_at
time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Useobserved_at
to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. CallingGET
repeatedly will fetch more recently observed offsets. - Information about the connector.
- In these examples, the curly braces around “{connector_name}” indicate a replaceable value.
To update the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies new offset and a patch type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "PATCH",
"offsets": [
{
"partition": {
"sidPdb": "ORCLCDB.ORCLPDB1"
},
"offset": {
"scn": "2169287",
"tablePlacement": ""
}
}
]
}
Considerations:
- You can only make one offset change at a time for a given connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- For source connectors, the connector attempts to read from the position defined by the requested offsets.
- Oracle CDC Source connector already provides the
start.from
configuration, which allows you to specify an offset for the connector. For more information, see Configuration Properties. - If there are schema changes between the current offset and the earlier offset you are resetting to, the connector fails.
- Do not reset the offset when the connector is in snapshot phase. If you interrupt a snapshot, the connector must start again from the beginning.
- SCN cannot be arbitrary. Choose an SCN that is part of either archived or online redo logs.
- If SCN is older than what is available in either archived or online redo logs, the connector fails with an error similar to this:
ORA-01291: missing log file
- If the SCN is reset to a value greater than the last stored offset by the connector, this could lead to data loss.
- Resetting to an earlier value creates duplicate entires in the redo log topic and table topics.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"sidPdb": "ORCLCDB.ORCLPDB1"
},
"offset": {
"scn": "2169287",
"tablePlacement": ""
}
}
],
"requested_at": "2024-03-28T17:58:45.606796307Z",
"type": "PATCH"
}
Responses include the following information:
- The requested position of the offsets in the source.
- The time of the request to update the offset.
- Information about the connector.
To delete the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the delete type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "DELETE"
}
Considerations:
- Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Do not issue delete and patch requests at the same time.
- For source connectors, the connector attempts to read from the position defined in the base state.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the result.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2024-03-28T17:59:45.606796307Z",
"type": "DELETE"
}
Responses include the following information:
- Empty offsets.
- The time of the request to delete the offset.
- Information about the Kafka cluster and connector.
- The type of request.
To get the status of a previous offset request, make a GET
request that specifies the environment, Kafka cluster, and connector
name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud
Considerations:
- The status endpoint always shows the status of the most recent PATCH/DELETE operation.
Response:
Successful calls return HTTP 200
with a JSON payload that describes the result. The following is an example
of an applied patch.
{
"request": {
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"sidPdb": "ORCLCDB.ORCLPDB1"
},
"offset": {
"scn": "2169287",
"tablePlacement": ""
}
}
],
"requested_at": "2024-03-28T17:58:45.606796307Z",
"type": "PATCH"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
},
"previous_offsets": [],
"applied_at": "2024-03-28T17:58:48.079141883Z"
}
Responses include the following information:
- The original request, including the time it was made.
- The status of the request: applied, pending, or failed.
- The time you issued the status request.
- The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.
JSON payload¶
The table below offers a description of the unique fields in the JSON payload for managing offsets of the Oracle CDC Source connector.
Field | Definition | Required/Optional |
---|---|---|
sidPdb |
A combination of Oracle System Identifier (SID) and pluggable database (PDB) name separated with a dot character, if you use a PDB. Otherwise, you only need the SID. | Required |
scn |
The starting Oracle System Change Number (SCN). The connector captures transactions that begin at or after this SCN. The connector will not capture transactions that begin before this SCN, even if they commit at or after this SCN. | Required |
tablePlacement |
The table placement plan specifies how tables are placed/assigned to partitions within the redo log topic. This field is reserved for future use and must be set to an empty string. | Optional |