Oracle CDC Source Connector for Confluent Platform¶
Oracle 11g, 12c and 18c Deprecation
Oracle discontinued support for the following Oracle Database versions:
- Version 11g on December 31, 2020
- Version 12c on March 31, 2022
- Version 18c on June 30, 2021
Oracle CDC connector support for each of these versions will reach end-of-life on June 30, 2025. Confluent currently supports Oracle Database versions 19c and later.
The Kafka Connect Oracle CDC Source connector captures each change to rows in a database and then represents the changes as change event records in Apache Kafka® topics. The connector uses Oracle LogMiner to read the database redo log. The connector requires a database user with permissions to use LogMiner and permissions to select from all of the tables captured by the connector. For additional information, see Oracle Database Prerequisites.
The connector can be configured to capture a subset of the tables in a single database, defined as all tables accessible by the user that match an include regular expression. It can also be configured to not capture tables that match a separate exclude regular expression.
The connector writes the changes from each of the tables to Kafka topics, where
the table-to-topic mapping is determined by the table.topic.name.template
connector configuration property. This
property defaults to the dot-delimited fully-qualified name of the table (for
example, database_name.schema_name.table_name
).
The connector recognizes literals and several variables (for example,
${tableName}
and ${schemaName}
) to customize table-to-topic mapping.
Variables are resolved at runtime. For example, the following configuration
property results in changes to the ORCL.ADMIN.USERS
table to be written to
the Kafka topic named my-prefix.ORCL.ADMIN.USERS
.
table.topic.name.template=my-prefix.${databaseName}.${schemaName}.${tableName}
For a list of template variables, see Template variables.
The connector is designed to write all of the raw Oracle redo log records to one
Kafka topic logically referred to as the “redo log topic”. The
redo.log.topic.name
configuration property determines the name of this
topic. The connector actually consumes this topic to identify and produce all of
the table-specific events written to the table-specific topics. The connector
can be configured by setting the table.topic.name.template
property to an
empty string to only write to the redo log topic without generating
table-specific events to the table-specific topics.
There are many other configuration properties. For example, the metadata associated with each database change event can be included in the Kafka record header or in extra fields (with user-defined field names) in the Kafka record value.
Tip
To check out the Oracle CDC Source connector, complete the demo scenario in the blog article introducing the Oracle CDC Source connector.
Features¶
The Connect Oracle CDC Source connector includes the following features:
- Redo log topic
- Redo log corruption topic
- Table change event topics
- Pattern match tables to be captured
- Flexible mapping of tables to Kafka topics
- Record keys
- Snapshots
- Table partition snapshots
- Large object types
- Auto-table set sync and task reconfiguration
- Scalable database workloads
- Micro-rebalancing of task loads
- Automatic creation of Kafka topics
- Automated reconnection
- Oracle multi-tenant CDB/PDB architecture support
- Kerberos integration
- LDAP URL Support
- Connection reuse through connection pooling
- Archived redo log destination
- Infrequently Updated Databases
- Log sensitive data
- Manage long-running transactions
- Before state for update operation
Redo log topic¶
The connector reads the Oracle database redo log and writes each raw redo log event as a separate Kafka record. The connector queries the V$LOGMNR_CONTENTS view. Each row in the result set that applies to one of the matched tables is converted to records with a field for each column in the result set. The connector will write to this topic using an at-least-once guarantee. This means that, following an ungraceful stop of the Connect worker, the connector may rewrite a portion of the redo log event records upon restart.
To access redo log topics, you must grant the connector a corresponding operation–that is, CREATE, READ, or WRITE in an ACL. For output topics, you must grant the connector either CREATE or WRITE in an ACL. When granted READ, WRITE, or DELETE, the connector implicitly derives the DESCRIBE operation. For more information about ACL authorization controls and operations, see Authorization using ACLs.
Caution
- Redo log topics should not be shared among connectors–this may cause unexpected behavior.
- Currently, the connector only supports writing to the redo log topic with one partition.
- Confluent recommends you increase your log retention policies to 24 hours. If you have a shorter retention policy and your table doesn’t have many activities, the connector might not be able to find a record with the last committed SCN.
- When a connector is in the running state, you may not see a redo log topic
being created (
redo.log.topic.name
). The connector will create the topic when a newINSERT
,UPDATE
, orDELETE
is committed to the tables the connector is configured for. You may also create the topic with one partition before you run the connector.
Redo log corruption topic¶
It is possible that the Oracle redo logs themselves may be corrupted. The connector will not terminate if LogMiner reports corrupted blocks or segments. It is possible to configure the connector to write the corrupted block details to a separate Kafka topic, allowing downstream consumers to use this information to track and react to Oracle redo log file corruptions.
Table change event topics¶
The connector can turn raw logs into change events for each table and write these to Kafka topics using the configured table-to-topic mapping.
Pattern match tables to be captured¶
The connector configuration uses two regular expressions to identify the
tables in the database that it should capture. The connector captures events
from all tables in the database whose fully-qualified names (for example,
dbo.Users
) are matched by the include expression, unless explicitly
excluded by matching the exclude expression.
Flexible mapping of tables to Kafka topics¶
The connector configuration specifies a template that identifies the names of the Kafka topic to which the events are written. This template is resolved into a name for every change event and can use literals or template variables including the schema name, table name, database name, various timestamps, and transaction IDs. This gives users a flexible way of identifying the names of the Kafka topics where the change events are written.
Record keys¶
The records that the connector writes to Kafka topics have (by default) a key
corresponding to the primary key column values for the corresponding row in
the database. If the primary key consists of a single column, the Kafka
record’s key will contain the value of the column for that row. If the
primary key consists of multiple columns, the Kafka record’s key will be a
STRUCT containing a field for each of the primary key’s columns. You can
change this behavior by setting the key.template
configuration
property. As with other Connect source
connectors, each record’s key determines the topic partition where the
connector writes the record.
Snapshots¶
When a connector is first started, it attempts to obtain a snapshot of all
existing rows in each table, writing these (as records) to the Kafka topic for
the table, before starting to capture changes made to those rows. This
results in the Kafka topic containing records for every row in the database
table. However, if the Kafka topic should only contain records from a specific
point in time, you can use the start.from
configuration property to specify an SCN or timestamp. This will
set the point where the connector will start capturing events for all tables.
Note
If the connector is interrupted, is stopped, or fails while performing a snapshot of any tables, upon recovery or restart the connector restarts all incomplete snapshots from the beginning. Unfortunately, it is currently not possible to resume a snapshot of a table that is changing while ensuring that all changes to that table have been captured.
Table partition snapshots¶
When used on CP 6.x or later, the connector has an advanced feature of table partition snapshots. With this feature, the connector performs snapshots, in parallel, of large tables that are partitioned in Oracle, and distributes these table-partition snapshots across all tasks. This helps to scale the number of tasks linearly, so more snapshots are performed in parallel across a larger number of tasks. For example, a connector can capture and snapshot a single large table (N=1) with many table partitions (for example, P=20) using up to P+1 tasks. This reduces the overall time required to perform the snapshot by scaling out the number of tasks.
Note
When running a connector with snapshot.by.table.partitions=true
, create
table-specific topics ahead of time. If table-specific topics are not created
ahead of time, some tasks assigned to partitioned tables will fail. In
addition, ensure target tables are all partitioned. If you have a mix of
non-partitioned tables and partitioned tables, use
snapshot.by.table.partitions=false
To view the property description, go
to snapshot.by.table.partitions.
Large object types¶
You can configure the connector to capture changes in tables that contain
columns with binary large object (BLOB), character large object (CLOB), and
national character large object (NCLOB) types. These large object (LOB) types are
written to separate LOB topics that can be consumed by downstream applications.
To enable this feature, specify a template variable to use in the
lob.topic.name.template
configuration property (see
Template variables for supported variables). When
enabled, the connector writes LOB objects to a separate topic with the key of
the topic, consisting of the table full name, column name, and primary key of
the change record of the LOB object row.
Note
- A table that contains large object (LOB) type columns must include primary keys.
- LOB objects of more than 1 KB in size are supported. Ensure you set
enable.large.lob.object.support
totrue
. - Be careful when updating the value of primary keys when used in association with LOB topics. When an update to the primary key is processed, the connector will emit the updated record to the change event, but will not retroactively update the LOB record key.
Auto-table set sync and task reconfiguration¶
Tables can be deleted and created in the Oracle database while the connector is running. The connector periodically checks for newly added or recently dropped tables that match the tables to be captured. When the connector identifies new or deleted tables, the connector automatically reconfigures its tasks to stop watching the deleted tables and begin capturing changes from new tables that match the table filter expressions.
Scalable database workloads¶
The connector is designed to scale from small to large database workloads
using connector tasks. The connector can be configured to use as few as one
task (tasks.max=1
) or scale to as many tasks as required to capture all
table changes.
Micro-rebalancing of task loads¶
This feature applies only to connectors in a Connect cluster running Confluent Platform 6.0 or later. Upon startup, the connector evenly distributes tables across its tasks. The connector monitors throughput variations for each table and the position of each task in the redo log. The connector automatically attempts to distribute the load across all of the connector’s tasks by assigning frequently-changing tables to different tasks.
Automatic creation of Kafka topics¶
This feature applies only to connectors in a Connect cluster running Confluent Platform 6.0 or later. You can include rules in your connector configuration that define the topic settings for any topic that the source connector writes to. If you are using an earlier version of Confluent Platform, either create the Kafka topics ahead of time or configure your Kafka brokers to automatically create topics (see the broker configuration properties).
Note
When running a connector with snapshot.by.table.partitions=true
, create
table-specific topics ahead of time. If table-specific topics are not created
ahead of time, some tasks assigned to partitioned tables will fail. In
addition, ensure target tables are all partitioned. If you have a mix of
non-partitioned tables and partitioned tables, use
snapshot.by.table.partitions=false
To view the property description, go
to snapshot.by.table.partitions.
Automated reconnection¶
The connector is able to automatically reconnect when the connection to the
database is disrupted or interrupted. When a connection is lost, the
connector stops, logs a disconnection warning or error messages, and attempts
to reconnect using exponential backoff. Once the connection is
re-established, the connector automatically resumes normal operation. Several
connection properties control this behavior, including query.timeout.ms
(defaults to 5 mins) and max.retry.time.ms
(defaults to 24 hours). You
can change these values. You set max.retry.time.ms
to 0 to disable
automated reconnection.
Oracle multi-tenant CDB/PDB architecture support¶
Oracle provides multitenant architecture support. System
tables are stored in a single container database (CDB). User tables are stored
in pluggable databases (PDBs) plugged into the CDB. Each instance of the Oracle
CDC connector can read user tables that reside in one PDB. The PDB name where
user tables reside can be configured using the property oracle.pdb.name
. To
read from system tables in the CDB, leave the oracle.pdb.name
configuration
property blank. The oracle.sid
property must be set to the Oracle system
identifier (SID) to access either CDB, PDB, or legacy non-multitenant database.
LogMiner needs all PDBs in the container database to be opened in READ WRITE
mode to build a dictionary.
Note
The connector does not support the multitenant container database for Amazon RDS.
Kerberos integration¶
Use the oracle.kerberos.cache.file
configuration property to set the location of the Kerberos
ticket cache file. For an example, see
Using Kerberos authentication.
LDAP URL Support¶
Use the ldap.url
configuration property to set the LDAP connection URL
for the database.
Note
- This connector supports Oracle Internet Directory (OID) based implementation.
- Simple Authentication is supported. You must configure
ldap.security.principal
andldap.security.credentials
configuration property to use authentication. - The connector has been tested to work with OID version 12.2.1.4.0 and Oracle Database 19c.
Connection reuse through connection pooling¶
Use the connection.pool.*
configuration properties to configure the connection pool for
efficient reuse of connections to the database. The connector uses one
connection to stream changes from the Oracle database. In addition, it uses
one connection per-table during the initial snapshot phase. Once the snapshot
is complete, only task zero will require a connection to the database to
stream database changes into the redo log topic.
Archived redo log destination¶
The connector provides a configuration to specify the archived redo log destination
used to read the redo log files. Use the log.mining.archive.destination.name
configuration
to specify the name of the log archive destination to use when mining archived redo logs. You
can configure the connector to use a specific destination using the destination name, for
example, LOG_ARCHIVE_DEST_1.
Note that this feature is available from version 2.10.0 for Oracle 19c and in subsequent supported versions.
Infrequently Updated Databases¶
The connector tracks offsets in the connect offsets topic so that the processing
can be resumed from that point upon restarts. The offsets tracked by the connector
stores the SCN from which to resume. However, for the connector to be able to emit
records to the offsets topic, it needs to process records of relevant tables. On an
infrequently updated database where the source offsets are not able to move forward,
a task restart can result in an ORA-01291 missing logfile
or ORA-01292: no log file
error if the archived redo log file corresponding to the stored SCN in the source offset
has been purged from the database. In such scenarios, you can use the heartbeat feature
(see the following note) to emit heartbeat events from the connector so the source offsets
can move forward.
To use the heartbeat feature, set the heartbeat.interval.ms
configuration
property to a value greater than 0. This will cause the connector to emit
heartbeats to an internal heartbeat topic (configured using
heartbeat.topic.name
) at regular intervals. Confluent recommends you set the
heartbeat.interval.ms
parameter to a value with an order of minutes to
hours. The default value of heartbeat.interval.ms
is 0, which disables
emission of heartbeat records from the connector.
Note
- This feature is available from version 2.3.0 for Oracle 19c and in subsequent supported versions. For supported versions prior to Oracle 19c, use version 2.7.0.
- If there are long running transactions, then the SCN produced to the
heartbeat topic will be the minimum SCN of all open transactions. This
means that setting the configuration will not help for cases where a
transaction doesn’t commit for periods longer than the retention size of
the archive log files. Confluent recommends you look for metrics like
oldest-transaction-id-in-buffer
,last-processed-transaction-id
and more, if available at Monitoring using Java Management Extensions (JMX).
Log sensitive data¶
Use the log.sensitive.data
configuration property
to configure whether the connector logs or redacts sensitive information. Set
this property to true only when it is acceptable for troubleshooting purposes.
This property defaults to false, such that all sensitive information is redacted by default.
Manage long-running transactions¶
This section is applicable only when the configuration
use.transaction.begin.for.mining.session
is set to true
.
A long-running transaction can have a negative impact on the connector performance. It can cause each subsequent mining session to do more work and hence take more time to complete, leading to increased lag in processing change events. It can also impact the memory usage of the connector. The connector stores events corresponding to a transaction in an in-memory buffer until it receives the commit or rollback event for the transaction. A long-running transaction that has a large number of changes can lead to increased memory usage and result in an out-of-memory error, if there is an insufficient amount of available memory.
As a general guidance, Confluent recommends you avoid long-running transactions, since it can impact the scalability of the system and could lead to deadlocks. It is best to change your applications so that they don’t have such long-running transactions.
In situations where making changes to an application is not feasible, consider
using the log.mining.transaction.age.threshold.ms
and
log.mining.transaction.threshold.breached.action
configurations to drop
long-running transactions. Use these properties with caution as dropping
transactions could result in potential data loss.
Use the log.mining.transaction.age.threshold.ms
configuration to define a
threshold (in milliseconds) for the transaction age. The transaction age is
defined as the period of time the transaction has been open on the database. If
the transaction age exceeds this threshold, then an action is taken depending on
the value set in log.mining.transaction.threshold.breached.action
. The
default value is -1, which means that a transaction is retained in the buffer
until the connector receives the commit or rollback event for the transaction.
This should not be set to a value that is more than the retention period for
archived redo log files.
Use the log.mining.transaction.threshold.breached.action
configuration to
control the action to take when an active transaction exceeds the threshold
defined in the log.mining.transaction.age.threshold.ms
configuration. For
help with setting the log.mining.transaction.threshold.breached.action
property, and for supported values, see its configuration description.
Important
This feature is available from version 2.10.0 for Oracle 19c and in subsequent supported versions.
Before state for update operation¶
For an update operation, the connector under default configuration populates the state of a row after a change
occurs in the change event record. You can configure the connector to include the state of the row before the
change by specifying a non-empty value for the configuration output.before.state.field
. The specified value
will be used as the field name under which the before state would be populated in the change event record. For
more information, see this example..
There are no changes to the change event records for snapshot, insert and delete operations when using this feature.
Note
- This feature is available from version 2.13.0 for all supported versions of Oracle.
- If the value converter used is JSON_SR, then the schema evolution is not backward compatible.
- This is not supported for columns with large object (LOB) types.
Requirements and current limitations¶
The following sections provides usage requirements and current limitations.
Note
Be sure to review the Oracle Database Prerequisites before using the Oracle CDC Source connector.
Oracle versions¶
The Oracle CDC Source connector is compatible with the following Oracle versions:
- Oracle 19c Enterprise Edition (see Address DDL Changes in Oracle Database for Confluent Platform and the blog post Confluent’s Oracle CDC Connector Now Supports Oracle Database 19c)
- Oracle 21c Enterprise Edition (see Address DDL Changes in Oracle Database for Confluent Platform)
- Oracle 23ai (see Address DDL Changes in Oracle Database for Confluent Platform)
The connector supports Oracle Real Application Clusters (RAC). The connector has not been tested against Oracle Exadata.
Important
- The connector works with hosted databases in Oracle Cloud (OCI) if the databases are supported by the connector. The connector does not work with Oracle Autonomous Databases.
- The LogMiner API still exists in 19c and later supported versions, but Continuous Mining is no longer included.
- The connector works with Oracle Active Data Guard and Oracle Data Guard, but it must point to a primary database.
- If the
UPDATE
orDELETE
statement in the Oracle Redo Log only includesROWID
in a where clause (for example,UPDATE MYUSER.CUSTOMER SET FIRST_NAME = 'CONFLUENT' where ROWID = 'AAEm9+AM9AABrA7AAB';
), the connector will not be able to create a full record. You can setbehavior.on.unparsable.statement=log
to skip the record. - The connector does not support the multitenant container database for Amazon RDS.
Confluent Platform versions¶
The connector can be installed in Kafka Connect workers running Confluent Platform 5.3 (and later). It is recommended that you deploy the connector on Connect workers running Confluent Platform 6.0 (and later). In Confluent Platform 6.0 (and later), the connector can automatically distribute workloads across all of the connector’s tasks by assigning frequently-changing tables to different tasks.
Data types¶
For supported data types, see Supported Data Types.
The connector does not support the following data types and table storage attributes. If a table has columns with any of these unsupported data types, the connector ignores the entire table.
- BFILE
- Nested tables
- Objects with nested tables
- Tables with identity columns
- Temporal validity columns
- PKREF columns
- PKOID columns
- Nested table attributes and stand-alone nested table columns
The connector cannot differentiate between numeric types INT, INTEGER, SMALLINT, DEC, DECIMAL, NUMBER, NUMERIC. All these numerical types are mapped to the Connect Decimal logical type. For more about this, see Kafka Connect Deep Dive. The connector cannot differentiate between float number types DOUBLE PRECISION, REAL, and FLOAT. All of these float number types are mapped to Connect FLOAT64.
Note
- You can use the configuration property
numeric.mapping
to map numeric types with known precision and scale to their best matching primitive type. Numeric mapping property documents the specific precision and scale required on each numeric type to be able to map to a given connect primitive type. - A few of the PL/SQL-exclusive data types are not supported.
DDL statements¶
The connector recognizes and parses DDL statements applied to the database, after the connector starts. These DDL statements are used to identify changes in the structure of captured tables and to adjust the schema of event records written to Kafka.
The connector’s DDL parser does not support the following DDL statements:
ALTER TABLE statements to add or remove constraints.
ALTER TABLE statements dropping multiple columns in a single statement.
ALTER TABLE adding columns of TIMESTAMP type with DEFAULT.
ALTER TABLE with columns that contain user-defined types.
ALTER TABLE to rename tables or columns.
ALTER TABLE statements to remove all data from a specified partition of a partitioned table.
ALTER TABLE statements to modify LOB columns.
ALTER TABLE statements with ADD/DROP PARTITION.
ALTER TABLE statements to modify columns with NULL or NOT NULL that don’t contain the column type in the statement. Note that as a workaround, you can use the following statement:
ALTER TABLE <TABLE> MODIFY <COLUMN> <COLUMN TYPE> NULL;
GRANT statements to grant SELECT, UPDATE, INSERT, DELETE, and other privileges on tables.
DROP TABLE with CASCADE CONSTRAINTS.
For help with handling DDL statements, see Address DDL Changes in Oracle Database for Confluent Platform.
Mutual TLS with DB authentication and PDBs¶
Mutual TLS with DB authentication works for CDBs, but does not work for PDBs.
With PDBs, there needs to be an external user identified as 'CN=...'
.
However, this external user is a global user and not a common user. When the
connector is launched, it will fail with the following exception:
ORA-65053: A global user cannot change the container in the session.
Other considerations¶
- The Protobuf converter works with the connector when Confluent Platform 6.2.0 or above is used.
- Currently, the connector does not record transaction summary information.
Redo log¶
- If the Oracle redo log is corrupted, or if the Oracle redo log is incompatible with the current table schema, the connector sends the redo log block to the error log topic.
- Currently, the connector only supports writing to the redo log topic with one partition. All converted redo logs are sent to the same partition. If you create the redo log topic manually, create it as a single partition topic.
- Versions 2.0.0 and later of the connector use LogMiner with
record.buffer.mode
. The default setting is"record.buffer.mode"="connector"
, whereby uncommitted transactions will be buffered in the connector’s memory. If you choose to keep the default setting and have large transactions, ensure you have a large amount of memory for your Connect workers. If not, consider breaking these large transactions into smaller transactions, avoiding a potential out-of-memory issue. For additional information about transactions, see Filtering and Formatting Data Returned to V$LOGMNR_CONTENTS. - The Oracle CDC Source connector does not work with an Oracle read-only replica for Amazon RDS. The connector uses the Oracle-recommended Online Catalog, which requires the database to be open for write access. For related details, see Working with an Oracle read-only replica for Amazon RDS.
- The Oracle CDC Source connector does not work with a DR cluster which doesn’t
have the
CURRENT
status of a redo log. You can check whether redo log files with theCURRENT
status exist by runningSELECT SEQUENCE# FROM V$LOG WHERE STATUS = 'CURRENT'
.
Tables and columns¶
Table or column names selected for mining must not exceed 30 characters.
RENAME
a column is not supported.When converting the
DATE
type to Unix Epoch time, the Oracle CDC Source Connector ignores the time portion (hours:minutes:seconds
) in theDATE
field.When there is an update to the primary key, the Oracle CDC connector will generate a tombstone. For example, if you create a table with the following specifications:
create table CUSTOMERS ( ID NUMBER(10) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 42) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(50), LAST_NAME VARCHAR(50), EMAIL VARCHAR(50), GENDER VARCHAR(50), CLUB_STATUS VARCHAR(20), COMMENTS VARCHAR(90), CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP , UPDATE_TS TIMESTAMP );
Your primary key is
ID
, if you do:log "Updating primary key value" docker exec -i oracle sqlplus C\#\#MYUSER/mypassword@//localhost:1521/ORCLCDB << EOF update CUSTOMERS set ID = '123' where ID = '1'; exit; EOF
This generates three records in the table topic:
{"ID":1,"FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"long":1657297016227},"UPDATE_TS":{"long":1657297016000},"COUNTRY":null,"table":{"string":"ORCLCDB.C##MYUSER.CUSTOMERS"},"scn":{"string":"2162249"},"op_type":{"string":"D"},"op_ts":{"string":"1657297083000"},"current_ts":{"string":"1657282688459"},"row_id":{"string":"AAAR34AAHAAAAFfAAA"},"username":{"string":"C##MYUSER"}} null {"ID":123,"FIRST_NAME":{"string":"Rica"},"LAST_NAME":{"string":"Blaisdell"},"EMAIL":{"string":"rblaisdell0@rambler.ru"},"GENDER":{"string":"Female"},"CLUB_STATUS":{"string":"bronze"},"COMMENTS":{"string":"Universal optimal hierarchy"},"CREATE_TS":{"long":1657297016227},"UPDATE_TS":{"long":1657297083000},"COUNTRY":null,"table":{"string":"ORCLCDB.C##MYUSER.CUSTOMERS"},"scn":{"string":"2162249"},"op_type":{"string":"I"},"op_ts":{"string":"1657297083000"},"current_ts":{"string":"1657282688460"},"row_id":{"string":"AAAR34AAHAAAAFfAAA"},"username":{"string":"C##MYUSER"}}
To ignore tombstones in your Sink connector use SMT:
"transforms": "tombstoneHandlerExample", "transforms.tombstoneHandlerExample.type": "io.confluent.connect.transforms.TombstoneHandler", "transforms.tombstoneHandlerExample.behavior": "warn",
Numeric mapping¶
The Oracle CDC connector doesn’t support updating the numeric.mapping
setting for a running connector. To update the setting, take the following
actions:
- Delete the redo log topic, table-specific or CDC topics, registered schemas, and the running connector.
- Deploy a new connector with a new name.
SMT¶
Using single-message transformations (SMTs) is not recommended, unless the following conditions apply:
- The SMT is insensitive to record schema.
- The SMT does not alter the schema of records.
- The SMTs that alter schema are restricted to the table topics using predicates as demonstrated in the SMT Examples documentation.
Install the Oracle CDC Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
An installation of the Confluent Hub Client.
Note
The Confluent Hub Client is installed by default with Confluent Enterprise.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-oracle-cdc:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-oracle-cdc:2.14.0
You must add the
orai18n.jar
file to the/lib
folder when installing this connector.
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
Confluent’s Oracle CDC Source connector is a Confluent Premium connector subject to the Confluent enterprise license and therefore requires an additional subscription.
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription to Confluent’s Oracle CDC Source connector which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, contact Confluent Support for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for the Oracle CDC Source connector, see Configuration Reference for Oracle CDC Source Connector for Confluent Platform.
Quick Start¶
For quick start instructions, see Quick Start for the Oracle CDC Source Connector for Confluent Platform.
Creating Topics¶
Creating Kafka topics for records sourced from your database requires setting a few configuration properties.
Confluent Platform version 6.0 (or later)¶
If you are using Confluent Platform 6.0 (or later), you can configure your Connect worker to automatically create missing topics by adding properties to the worker and connector configuration.
Add the following configuration property to the Connect worker and then restart the worker.
topic.creation.enable=true
Add the following configuration properties to the connector configuration:
topic.creation.groups=redo topic.creation.redo.include=your-redo-log-topic topic.creation.redo.replication.factor=3 topic.creation.redo.partitions=1 topic.creation.redo.cleanup.policy=delete topic.creation.redo.retention.ms=1209600000 topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 topic.creation.default.cleanup.policy=compact
These properties define a topic creation rule called “redo” that creates a Kafka
topic named your-redo-log-topic
(the topic can have any name) with 1
partition and 3 replicas. The records for this topic can be deleted after 14
days (1209600000 milliseconds). You can change the replication factor and
cleanup policy.
Note
The retention time needs to be longer than the maximum time the connector is allowed to be out of service.
All other topics are created with five partitions and three replicas. These topics have compaction enabled to remove any records for which there is a newer record with the same record key.
Confluent Platform version 5.5 (or earlier)¶
If you are using Confluent Platform 5.5 (or earlier) and the property
auto.create.topics.enable=true
is set in your Kafka broker configuration, the
Kafka broker automatically creates any topics to which the Oracle CDC Source
Connector writes. The Kafka broker creates the topics using the following
connector configuration properties:
redo.log.topic.name
redo.log.corruption.topic
table.topic.name.template
If you are using Confluent Platform 5.5 (or earlier) and the property
auto.create.topics.enable=false
is set in your Kafka broker configuration,
you must create topics manually before running the connector. Create the topics
before configuring the connector to use created topics with the following
connector configuration properties:
redo.log.topic.name
redo.log.corruption.topic
table.topic.name.template
Template variables¶
The connector uses template variables to create the name of the Kafka topic and the record key for each of the change events. The variables are similar to the Oracle GoldenGate Kafka Connect template variables which simplify migrating from Oracle GoldenGate to this connector. Variables are resolved at the task level and table level.
Connector and task variables¶
Variable keyword | Description |
---|---|
${connectorName} | Resolves to the name of the connector. |
${databaseName} | Resolves to the database name. |
${emptyString} | Resolves to an empty string. |
${staticMap[]} | Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square braces, in the following format: ${staticMap[dbo.table1=value1,dbo.table2=value2]} . |
${currentTimestamp} or ${currentTimestamp[]} | Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate} , ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
Table variables¶
Variable keyword | Description |
---|---|
${schemaName} | Resolves to the schema name for the table. |
${tableName} | Resolves to the short table name. |
${fullyQualifiedTableName} | Resolves to the fully-qualified table name including the period (.) delimiter between the schema and table names. For example, dbo.table1 . |
Column variables¶
Variable keyword | Description |
---|---|
${columnName} | Resolves to the column name. |
Record variables¶
Variable keyword | Description |
---|---|
${opType} | Resolves to the type of the operation: READ, INSERT, UPDATE, or DELETE. |
${opTimestamp} | Resolves to the operation timestamp from the redo log. |
${rowId} | Resolves to the ID of the changed row. |
${primaryKey} | Resolves to the concatenated primary key values delimited by an underscore (_ ) character. |
${primaryKeyStruct} | Resolves to a STRUCT with fields for each of the primary key column values. |
${primaryKeyStructOrValue} | Resolves to either a STRUCT with fields for each of the 2+ primary key column values, or the column value if the primary key contains a single column. |
${scn} | Resolves to the system change number (SCN) when the change was made. |
${cscn} | Resolves to the system change number (SCN) when the change was committed. |
${rbaseq} | Resolves to the sequence number associated with the Redo Block Address (RBA) of the redo record associated with the change. |
${rbablk} | Resolves to the RBA block number within the log file. |
${rbabyte} | Resolves to the RBA byte offset within the block. |
${currentTimestamp} or ${currentTimestamp[]} | Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate} , ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
Supported Data Types¶
The following table lists data types and the associated Connect mapping.
Oracle data type | SQL type code | Connect mapping |
---|---|---|
CHAR or CHARACTER | 1 | STRING |
LONG | 1 | STRING |
VARCHAR | 12 | STRING |
VARCHAR2 | 12 | STRING |
NCHAR | -15 | STRING |
NVARCHAR2 | -9 | STRING |
RAW | -3 | BYTES |
INT or INTEGER | 2 | DECIMAL |
SMALLINT | 2 | DECIMAL |
DEC or DECIMAL | 2 | DECIMAL |
NUMBER | 2 | DECIMAL |
NUMERIC | 2 | DECIMAL |
DOUBLE PRECISION | 6 | FLOAT64 |
FLOAT | 6 | FLOAT64 |
REAL | 6 | FLOAT64 |
TIMESTAMP WITH TIMEZONE | -101 | TIMESTAMP |
TIMESTAMP WITH LOCAL TIME ZONE | -102 | TIMESTAMP |
DATE | 91 | DATE |
BLOB | 2004 | BYTES |
CLOB | 2005 | BYTES |
NCLOB | 2011 | BYTES |
XMLTYPE | 2009 | BYTES |
Note
- The maximum length of a RAW column that is supported is 2000 bytes.
- The
-101
and-102
codes forTIMESTAMP WITH TIMEZONE
andTIMESTAMP WITH LOCAL TIMEZONE
are Oracle-specific. BLOB, CLOB, NCLOB, and XMLTYPE are handled out-of-band with a separate LOB topic. The JSON data type in Oracle 21c is not supported. - The VECTOR and BOOLEAN data types introduced in Oracle 23ai are not supported.
Using the Connector with Confluent Cloud¶
To run the Oracle CDC Source connector with Kafka topics in Confluent Cloud, see Capturing Redo logs and Snapshot with Supplemental logging only.
Note
The configuration examples are based on running the connector with Confluent Platform version 6.0 (and later).