Oracle XStream CDC Source Connector for Confluent Platform

The self-managed Oracle XStream CDC Source connector for Confluent Platform captures all changes made to rows in an Oracle database and represents the changes as change event records in Apache Kafka® topics. The connector uses Oracle’s XStream API to read changes from the database redo log.

Oracle XStream is a set of components and APIs in Oracle database that enables client applications, like the connector, to receive changes from an Oracle database.

The connector leverages XStream Out to capture both Data Manipulation Language (DML) and Data Definition Language (DDL) changes from the database redo log. When XStream Out is used, a capture process captures changes made to an Oracle database, converts the changes into Logical Change Records (LCRs), and sends the LCRs to an outbound server. The outbound server then sends the LCRs to the connector.

Note

The connector is built using the Debezium and Kafka Connect frameworks.

Features

The Oracle XStream CDC Source connector includes the following features:

Snapshot

When you start the connector for the first time, it takes a snapshot of the schema for each captured table and, optionally, captures a consistent snapshot of the current state of the rows for these tables. As part of this snapshot process, the connector acquires a lock (ROW SHARE MODE) on each of the captured tables. This lock, required only for capturing the table schema and not the row data, is hence held for a short duration. The connector uses an Oracle Flashback query to capture the state of the existing rows. You can customize the snapshot behavior by using the snapshot.mode configuration property.

If the connector is interrupted, stopped, or fails during the snapshot process of any tables, upon recovery or restart, the connector restarts all snapshots from the beginning. It is currently not possible to resume a snapshot of a table that is changing while ensuring that all changes to that table have been captured.

The connector supports parallel snapshots. The connector supports a single task, but you can use multiple threads to speed up the snapshot process. The connector distributes the captured tables across the threads, although it does not split a single table across multiple threads. The snapshot.max.threads configuration property controls the number of threads the connector uses during the initial snapshot. To enable parallel processing, you should set this property to a value greater than 1. By using parallel snapshots, the connector can process multiple tables at the same time, improving performance by distributing the workload across the available threads. Each thread uses a separate database connection.

Note

Tuning undo retention and tablespace

The connector uses Oracle Flashback Query for snapshot operations. A “snapshot too old” error (ORA-01555) can occur if the undo data required for a consistent view is overwritten and no longer available in the undo segments.

To prevent this, consider:

  • Increasing the UNDO_RETENTION parameter to retain undo data for a duration longer than the longest expected snapshot operation.
  • Ensuring the undo tablespace has enough space to support the configured undo retention.

If increasing undo retention and tablespace size is not feasible, enable Oracle Flashback Time Travel (Flashback Data Archive) on the source tables. With this setup, Oracle Flashback Query retrieves historical data from the archive instead of undo segments, provided the required data is available within the archive.

Streaming

After the initial snapshot is completed, the connector starts streaming changes for the specified tables. The connector streams changes from the Oracle database using Oracle’s XStream Out API. During this phase of operation:

  • The connector starts by attaching to the XStream outbound server specified in the database.out.server.name configuration property.
  • After successfully attaching to the outbound server, the connector receives changes made to the captured tables, and writes these changes as records to the appropriate change event topics in Kafka. Each change includes the full state of the row.

The connector receives changes from the database in transaction commit order. It ensures that events for each table are written to the change event topic in the same order as they occurred in the database.

Note

An XStream outbound server can support only one active client session at a time. This means multiple connectors cannot be attached to the same outbound server simultaneously. As a result, separate outbound servers must be configured for each connector.

Change event topics

The connector writes change events for all changes in a table to a specific Apache Kafka® topic dedicated to that table.

The connector uses two configuration properties to identify which tables to capture from the database:

  • The table.include.list configuration specifies a comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes should be captured.
  • The table.exclude.list configuration specifies a comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes should not be captured.

Note

The tables to be captured from the database must be specified in both the connector configuration (for example, using the table.include.list configuration property) and in the rule sets of the capture process and outbound server to which the connector is attached.

The connector can capture changes from tables across different schemas within the same database. A separate change event topic is created for each table being captured, ensuring that changes are streamed to distinct topics per table.

Schema changes

The connector stores the schema of captured tables over time in a dedicated topic, known as the schema history topic.

  • This topic is initially populated with the table schema during the initial snapshot.
  • It is subsequently updated as the connector processes DDL statements (like CREATE, ALTER) during the streaming phase.

Upon a connector restart, the connector reads from this topic to rebuild the schema of each captured table as it existed at the point in time when streaming resumes. This ensures that the connector can correctly interpret the change events based on the schema at the time the changes were made.

You can configure the name of the database schema history topic by using the schema.history.internal.kafka.topic configuration property. This topic should be configured with a single partition, infinite retention, and disabled compaction.

Note

The database schema history topic is intended for internal connector use only.

At-least-once delivery

The connector guarantees that records are delivered at least once to the Kafka topic.

Large object (LOB) type handling

The connector captures changes from tables that contain large object (LOB) columns, including binary large object (BLOB), character large object (CLOB), and national character large object (NCLOB) data types. The values are included in change events and sent to the corresponding change event topics.

Note

Oracle provides values for LOB columns only when they are explicitly set or changed in a SQL statement. As a result, if a LOB column is not modified, its value is not included in the change event. Instead, the connector inserts a placeholder value, which can be configured using the unavailable.value.placeholder property.

LOBs and Kafka message size limits

Kafka has a maximum message size limit that defines the largest message a producer can successfully send to a topic. By default, Kafka is optimized for small messages, but it also provides configuration options to adjust message sizes.

When capturing changes from tables with LOB columns, the resulting change event messages can sometimes exceed Kafka’s maximum message size limit. If this occurs, Kafka rejects the message, and the connector fails with a “record is too large” error.

To help manage message sizes, you can:

  • Enable compression at the producer level.
  • Use efficient serialization formats, such as Apache Avro or Protocol Buffers.

For more information, see Kafka message size limits documentation.

The connector offers configuration properties to manage LOB values that exceed a specified size threshold:

  • Use the lob.oversize.threshold property to define a size limit in bytes for LOB values.
  • Use the lob.oversize.handling.mode property to determine what happens when a LOB value exceeds this threshold:
    • fail (Default): The connector stops and reports an error.
    • skip: The connector replaces the oversized LOB value with a placeholder. You can customize this placeholder using the skip.value.placeholder property.

Note

A LOB value can be skipped only if its size is less than 2 GiB. If the value exceeds 2 GiB, the connector cannot skip it and will fail with an error.

You can control which columns are included in change event messages using the column.include.list and column.exclude.list configurations. These properties allow you to avoid processing LOB columns that are not required for downstream systems, reducing message size and improving performance.

Limitations

The following limitations apply to LOB support:

  • The connector does not support extended data types enabled by the MAX_STRING_SIZE initialization parameter.
  • The connector does not support LOB data types other than BLOB, CLOB, and NCLOB.
  • The connector does not capture changes resulting from piecewise LOB operations performed using procedures in the DBMS_LOB package (including the WRITE, TRIM, and ERASE procedures).
  • The connector does not capture changes resulting from fragment-based operations on SecureFiles LOB columns performed using procedures in the DBMS_LOB package (including the FRAGMENT_* procedures) or from SecureFiles archive manager operations.

Before and after state for change events

For update operations, the connector emits:

  • The state of the row before the update, with the original values.
  • The state of the row after the update, with the modified values.

For LOB columns, the connector handles change events for different operations as follows:

  • Insert operations: When a LOB column is inserted, the new LOB value appears in the after field.
  • Update operations: When a LOB column is updated, the new LOB value appears in the after field, while the before field contains an unavailable value placeholder.
  • Delete operations: When a LOB column is deleted, the before field contains the unavailable value placeholder.

Oracle multi-tenant architecture support

Each instance of the connector can capture tables from a single Pluggable Database (PDB). The PDB name, where the tables are located, can be configured using the database.pdb.name configuration property.

Note

If you need to read from tables in the Container Database (CDB), do not specify a value for the database.pdb.name configuration property.

Customizable data type handling

For certain data types, such as numeric, temporal, and binary large objects, you can customize how the connector maps them to Connect data types by modifying configuration properties. This allows for greater flexibility in handling different types of data, ensuring that the change events reflect the desired format and meet specific requirements.

Tombstone events

When a row is deleted in the source table, a delete change event is generated and sent to the Kafka topic. Subsequently, the connector emits a tombstone event with the same key as the original record, but with a null value. Tombstone records are used in Kafka’s log compaction process to ensure that only the most recent state of a record is retained in the log.

You can modify this behavior using the tombstones.on.delete configuration property.

Heartbeats

The connector periodically updates the outbound server with the position of the latest change it has processed, enabling the database to purge archived redo logs containing already processed transactions. However, if the database is inactive or no changes are being made to the captured tables, the connector cannot advance the position and update the outbound server.

Heartbeats are a mechanism that allows the connector to continue advancing the position even when the database is inactive or no changes are occurring to the captured tables. When enabled, the connector:

  • Creates a dedicated heartbeat topic.
  • Emits a simple event to this topic at regular intervals as needed.

This interval can be configured using the heartbeat.interval.ms configuration property. It is recommended to set the heartbeat.interval.ms configuration to a value with an order of minutes to hours. The default value of heartbeat.interval.ms is 0, which disables emission of heartbeat records from the connector.

Note

The heartbeat topic is intended for internal connector use only.

Automated error recovery

The connector has automated retries for handling various retriable errors. When a retriable error occurs, the connector automatically restarts in an attempt to recover. It will retry up to three times before stopping and entering a failed state, which requires user intervention to resolve.

The list of retriable errors is fixed and cannot be configured by the user.

Metrics

The connector provides built-in metrics for tracking the progress and performance of snapshot, streaming, and schema history processes. These metrics allow for better monitoring and insight into the connector’s operations, ensuring efficient data capture and processing.

Oracle Real Application Cluster (RAC) support

The connector fully supports Oracle RACs, enabling seamless integration with Oracle’s clustered databases, and ensuring high availability and fault tolerance.

Client-Side Field Level Encryption (CSFLE)

The connector supports CSFLE for sensitive data. For more information about CSFLE setup, see Manage CSFLE in Confluent Cloud for Self-Managed Connectors.

Requirements and current limitations

The following sections provides usage requirements and current limitations.

Oracle versions

The connector is compatible with the following Oracle versions:

  1. Oracle 19c Enterprise Edition
  2. Oracle 21c Enterprise Edition

The connector supports Oracle Exadata.

The connector supports Oracle Database 19c using the non-CDB architecture on Amazon RDS for Oracle. For more information, see Working with Amazon RDS for Oracle.

Java versions

The connector requires Java version 17 or higher.

Confluent Platform versions

The connector can be installed in Kafka Connect clusters running Confluent Platform 7.6 or higher.

Limitations

Be sure to review the following information:

  • The connector has not been tested against managed database services from cloud service providers (CSPs), other than Amazon RDS for Oracle.
  • The connector does not work with Oracle Autonomous Databases and Oracle Standby databases (using Oracle Data Guard).
  • The connector does not support Downstream Capture configurations.
  • The connector does not support the following Single Message Transforms (SMTs): GzipDecompress, TimestampRouter, and MessageTimestampRouter.

License

Confluent’s Oracle XStream CDC Source connector is a Confluent Premium connector subject to the Confluent Enterprise license and therefore requires an additional subscription.

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription to Confluent’s Oracle XStream CDC Source connector which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. For more information, contact Confluent Support if you are a subscriber.

For more information about license for self-managed connectors, see License for Self-Managed Connectors.

Note

Use of Confluent’s Oracle XStream CDC Source connector is not permitted under the Developer license.

Configuration properties

For a complete list of configuration properties for the Oracle XStream CDC Source connector, see Configuration Reference for Oracle XStream CDC Source Connector for Confluent Platform.

Supported data types

The connector creates change events for database changes. Each change event mirrors the table’s schema, with a field for every column value. The data type of each table column determines how the connector represents the column values in the corresponding change event fields.

For certain data types, such as numeric data types, you can customize how the connector maps them by modifying the default configuration settings. This allows more control over handling various data types, ensuring that the change events reflect the desired format and meet specific requirements.

Character data types

The following table describes how the connector maps character types.

Oracle data type Connect type
CHAR STRING
VARCHAR / VARCHAR2 STRING
NCHAR STRING
NVARCHAR STRING

In all cases, the connector ensures that character data is converted to a string type in Kafka Connect when creating change events.

Large object (LOB) types

You can adjust how the connector maps binary LOB data types by changing the binary.handling.mode configuration property.

The following table describes how the connector maps binary and character LOB types.

Oracle data type Connect type Notes
BLOB BYTES

Based on binary.handling.mode:

  • bytes: Represents as byte array (default)
  • base64: Represents as base64-encoded string
  • base64-url-safe: Represents as base64-url-safe-encoded string
  • hex: Represents as hex-encoded (base16) string
CLOB STRING  
NCLOB STRING  

Numeric data types

You can adjust how the connector maps numeric data types by changing the decimal.handling.mode configuration property.

The table below shows the mapping of numeric types when decimal.handling.mode is set to precise.

Oracle data type Connect type Notes
NUMBER(P, S <= 0) INT8 / INT16 / INT32 / INT64 / BYTES

Based on the precision and scale, the connector selects a matching Kafka Connect integer type:

  • If the precision minus the scale (P - S) is less than 3, it uses INT8.
  • If P - S is less than 5, it uses INT16.
  • If P - S is less than 10, it uses INT32.
  • If P - S is less than 19, it uses INT64.
  • If P - S is 19 or greater, it uses BYTES (org.apache.kafka.connect.data.Decimal).

NUMBER columns with a scale of 0 represent integer numbers. A negative scale indicates rounding in Oracle, for example, a scale of -2 causes rounding to hundreds.

NUMBER(P, S > 0) BYTES org.apache.kafka.connect.data.Decimal
NUMBER(P, [, * ]) STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

SMALLINT, INT, INTEGER BYTES

org.apache.kafka.connect.data.Decimal

Oracle maps SMALLINT, INT and INTEGER to NUMBER(38,0). As a result, these types can hold values that exceed the maximum range of any of the INT types.

NUMERIC, DECIMAL INT8 / INT16 / INT32 / INT64 / BYTES Handles in the same way as the NUMBER data type (note that scale defaults to 0 for NUMERIC).

FLOAT[(P)]

Maps to FLOAT(126) when P not mentioned

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

REAL - Maps to FLOAT(63)

DOUBLE PRECISION - Maps to FLOAT(126)

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

BINARY_FLOAT FLOAT32  
BINARY_DOUBLE FLOAT64  

Note

When decimal.handling.mode is set to:

  • string: The Oracle numeric data types are mapped to the Kafka Connect STRING type.
  • double: The Oracle numeric data types are mapped to the Kafka Connect FLOAT64 type.

Temporal data types

You can adjust how the connector maps some of the temporal data types by changing the time.precision.mode configuration property.

The table below shows the mapping of temporal types:

Oracle data type Connect type Notes
DATE INT64

Based on time.precision.mode:

  • adaptive: io.debezium.time.Timestamp
  • connect: org.apache.kafka.connect.data.Timestamp
TIMESTAMP[(P)] INT64

Based on time.precision.mode:

adaptive:

If precision <= 3: io.debezium.time.Timestamp

  • Represents the number of milliseconds since the UNIX epoch, without timezone information.

Else if precision <= 6: io.debezium.time.MicroTimestamp

  • Represents the number of microseconds since the UNIX epoch, without timezone information.

Else: io.debezium.time.NanoTimestamp

  • Represents the number of nanoseconds since the UNIX epoch, without timezone information.

connect:

org.apache.kafka.connect.data.Timestamp

Represents the number of milliseconds since the UNIX epoch, without timezone information.

TIMESTAMP WITH TIMEZONE STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp with timezone information.

TIMESTAMP WITH LOCAL TIMEZONE STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp in UTC.

INTERVAL YEAR[(P)] TO MONTH STRING

io.debezium.time.Interval

The string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S.

INTERVAL DAY[(P)] TO SECOND[(FP)] STRING

io.debezium.time.Interval

The string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S.

Note

When time.precision.mode is set to connect, there could be a loss of precision if the fractional second precision of a column exceeds 3, because Oracle supports a higher level of precision than the logical types in Kafka Connect.

Oracle End User Terms

In addition to the terms of your applicable agreement with Confluent, your use of the Oracle XStream CDC Source connector for Confluent Platform is subject to the following flow down terms from Oracle:

  • You must provide Confluent with prior notice if you transfer, assign, or grant any rights or interests to another individual or entity with respect to your use of the Oracle XStream CDC Source connector for Confluent Platform.
  • You agree, to the extent permitted by applicable law, that Oracle has no liability for (a) any damages, whether direct, indirect, incidental, special, punitive or consequential, and (b) any loss of profits, revenue, data or data use, arising from the use of the programs with respect to your use of the Oracle XStream CDC Source connector for Confluent Platform.
  • You agree that Oracle is not required to perform any obligations to you as part of your use of the Oracle XStream CDC Source connector for Confluent Platform.
  • Only applicable if you are an end user at any government level. If Oracle suspends any authorization or licenses in connection with the Oracle XStream CDC Source connector for Confluent Platform, Confluent may immediately suspend your access to the Oracle XStream CDC Source connector for Confluent Platform until Confluent resolves the issue with Oracle.