Oracle XStream CDC Source Connector for Confluent Cloud

The fully-managed Oracle XStream CDC Source connector for Confluent Cloud captures all changes made to rows in an Oracle database and represents the changes as change event records in Apache Kafka® topics. The connector can be configured to capture changes from a subset of tables in a database by using an include regular expression to match the table identifiers. It can also be configured to not capture tables that match a separate exclude regular expression.

This Quick Start is for the fully-managed Confluent Cloud connector. If you are installing the connector locally for Confluent Platform, see Oracle XStream CDC Source for Confluent Platform.

If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.

Supported Versions

Be sure to review the following information before using the Oracle XStream CDC Source connector.

Oracle versions

The connector is compatible with the following Oracle versions:

  • Oracle 19c Enterprise Edition
  • Oracle 21c Enterprise Edition

Java versions

The connector requires Java version 17 or higher.

Limitations

Be sure to review the following information:

  • The connector has not been tested against Oracle Exadata and managed database services from cloud service providers (CSPs), such as OCI and RDS.
  • The connector does not work with Oracle Autonomous Databases and Oracle Standby databases (using Oracle Data Guard).
  • The connector does not support Downstream Capture configurations.
  • If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
  • If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
  • The connector does not support the following Single Message Transforms (SMTs): GzipDecompress, TimestampRouter, and MessageTimestampRouter.

Manage custom offsets

You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.

To manage offsets:

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

The following example shows the offset once the snapshot has been completed.

{
  "id": "lcc-example",
  "name": "{connector_name}",
  "offsets": [
    {
      "partition": {
        "server": "{topic_prefix}"
      },
      "offset": {
        "scn": "4374567",
        "snapshot": "INITIAL",
        "snapshot_completed": true
      }
    }
  ],
  "metadata": {
    "observed_at": "2025-03-01T12:30:00.151015100Z"
  }
}

The following example shows the offset once streaming is in progress.

{
  "id": "lcc-example",
  "name": "{connector_name}",
  "offsets": [
    {
      "partition": {
        "server": "{topic_prefix}"
      },
      "offset": {
        "lcr_position": "000000000044150e0000000100000001000000000044150d000000140000000102"
      }
    }
  ],
  "metadata": {
    "observed_at": "2025-03-01T12:30:00.151015100Z"
  }
}

Responses include the following information:

  • The position of latest offset.
  • The observed time of the offset in the metadata portion of the payload. The observed_at time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Use observed_at to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. Calling GET repeatedly will fetch more recently observed offsets.
  • Information about the connector.

JSON payload

You can use either an Oracle System Change Number (SCN) or Logical Change Record (LCR) position to update the offsets of the Oracle XStream CDC Source connector.

The table below offers a description of the unique fields in the JSON payload for managing offsets of the connector using an Oracle SCN.

Field Definition Required/Optional
scn

The SCN of the change. It is set to the current SCN during the snapshot phase and obtained from the LCR position during the streaming phase.

To manage offsets, set this to the SCN from which to resume streaming.

Required
snapshot

Indicates the type of snapshot. It is set to INITIAL for an initial consistent snapshot.

To manage offsets, set this to INITIAL.

Required
snapshot_completed

Indicates whether the snapshot has been completed.

To manage offsets, set this to true.

Required

The table below offers a description of the unique fields in the JSON payload for managing offsets of the connector using an LCR position.

Field Definition Required/Optional
lcr_position

The position of the LCR. Set in streaming phase only.

To manage offsets, set this to the LCR position from which to resume streaming.

Required

General considerations

  • An existing outbound server can be used if the SCN or LCR position used in the offsets request is equal to or greater than the outbound server’s processed low position. To find the processed low position for an outbound server, see Displaying the Processed Low Position for an Outbound Server.

  • An existing outbound server cannot be used if the SCN or LCR position used in the offsets request is earlier than the outbound server’s processed low position. In this case, a new capture process and outbound server much be created with a first SCN and start SCN that precede the SCN or LCR position used in the offset.

    • The first SCN and start SCN must be a valid SCN and present in the redo log files available to the capture process.
    • The first SCN can be set to any value returned by the following query:
      • SELECT DISTINCT FIRST_CHANGE#, NAME FROM V$ARCHIVED_LOG WHERE DICTIONARY_BEGIN = 'YES'
      • The value returned in the NAME column indicates the redo log file containing the SCN corresponding to the first SCN. This redo log file, along with all subsequent redo log files, must be available to the capture process. If the query returns multiple distinct FIRST_CHANGE# values, choose the first SCN value that is most appropriate for the capture process you are creating.
    • The start SCN specified must be greater than or equal to the first SCN for the capture process.
  • To create a new connector with offsets, set the SCN in the offset and then start the connector in recovery snapshot mode using the snapshot.mode configuration property. The connector will first capture a snapshot of the schema for the capture tables, populating the schema history topic, and then begin streaming from the specified SCN in the offset. Once the recovery process is complete and streaming begins, reset the snapshot.mode to either initial or no_data to prevent the connector from initiating a recovery upon future restarts.

    Important

    The connector will fail if there have been schema changes to the captured tables after the specified SCN.

  • To update the offsets of an existing connector, set the SCN or LCR position in the offset.

    Important

    The connector will fail if there have been schema changes to the capture tables between the SCN or LCR position specified in the offsets request and the connector’s last processed SCN or LCR position.

Migrate connectors

Considerations:

  • The self-managed connector must be operating in streaming mode. If the self-managed connector is still in the process of making a snapshot, you can either create a new connector on Confluent Cloud which starts the snapshot process from the beginning or wait for the snapshot process to complete and follow the migration guidance.

  • The configurations of the self-managed connector must match the configurations of the fully-managed connector. You need to set the snapshot.mode configuration property to recovery. This ensures that the connector will first capture a snapshot of the schema for the capture tables, populating the schema history topic, and then begin streaming from the specified SCN in the offset.

    Important

    The connector will fail if there have been schema changes to the captured tables after the specified SCN.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Oracle XStream CDC Source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in an Oracle database and then monitoring and recording all subsequent row-level changes.

Before configuring the connector, see Oracle Database Prerequisites for Oracle database configuration information and post-configuration validation steps.

Prerequisites

  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

To create and launch a Kafka cluster in Confluent Cloud, see Create a kafka cluster in Confluent Cloud.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the Oracle XStream CDC Source connector card.

Oracle XStream CDC Source Connector Card

Step 4: Enter the connector details

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

At the Add Oracle XStream CDC Source Connector screen, complete the following:

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:
    • My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
    • Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
    • Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.

Note

Freight clusters support only service accounts for Kafka authentication.

  1. Click Continue.

If the connector is not running, see Oracle Database Prerequisites and review the Oracle database configuration information and post-configuration validation steps.

Step 5: Check the Kafka topic

After the connector is running, verify that records are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

  • Make sure you have all your prerequisites completed.
  • The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

For example:

confluent connect plugin describe OracleXStreamSource

Example output:

The following are required configs:
connector.class : OracleXStreamSource
database.dbname
database.hostname
database.service.name
database.out.server.name
database.user
database.password
name
output.data.value.format
tasks.max
topic.prefix
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties to deploy an instance of the connector.

The connector setup below performs the following:

  • Connects to the ORCLPDB1 pluggable database located at db.example.com on port 1521.
  • Initiates a snapshot of the employees table in the sample schema within the ORCLPDB1 pluggable database.
  • After snapshot completion, the connector listens for changes made to the employees table through the XOUT outbound server.
  • Streams the changes to the Kafka topic, cflt.SAMPLE.EMPLOYEES.
{
   "name": "oracle-connector",
   "config": {
     "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector",
     "tasks.max" : "1",
     "database.hostname": "db.example.com",
     "database.port": "1521",
     "database.user": "C##CFLTUSER",
     "database.password": "secret",
     "database.dbname": "ORCLCDB",
     "database.service.name": "ORCLCDB",
     "database.pdb.name": "ORCLPDB1",
     "database.out.server.name": "XOUT",
     "table.include.list": "SAMPLE.EMPLOYEES",
     "topic.prefix": "cflt",
     "kafka.auth.mode": "KAFKA_API_KEY",
     "kafka.api.key": "****************",
     "kafka.api.secret": "**************************************************"
   }
 }

Single Message Transforms: To add a new SMT using the CLI, see Add transforms. For more information about the Debezium SMT ExtractNewRecordState, see Debezium transformations.

See Configuration Properties for all properties and definitions.

Step 4: Load the properties file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file oracle-xstream-cdc-source.json

Example output:

Created connector OracleXStreamSource_0 lcc-ix4dl

Step 5: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID         |            Name       | Status  |  Type
+----------+-----------------------+---------+--------+
lcc-ix4dl  | OracleXStreamSource_0 | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

If the connector is not running, see Oracle Database Prerequisites for Oracle XStream CDC Source Connector for Confluent Cloud and review the Oracle database configuration information and post-configuration validation steps.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Configuration Properties

Use the following configuration properties with the fully-managed Oracle XStream CDC Source connector.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string
  • Default: default
  • Importance: medium

How should we connect to your database?

database.hostname

IP address or hostname of the Oracle database server.

  • Type: string
  • Importance: high
database.port

Port number of the Oracle database server.

  • Type: int
  • Default: 1521
  • Valid Values: [1,…,65535]
  • Importance: high
database.user

Name of the Oracle database user to use when connecting to the database.

  • Type: string
  • Valid Values: Must match the regex ^[^\?=%&\(\)]*$
  • Importance: high
database.password

Password of the Oracle database user to use when connecting to the database.

  • Type: password
  • Importance: high
database.dbname

Name of the database to connect to. In a multitenant container database, this is the name of the container database (CDB).

  • Type: string
  • Valid Values: Must match the regex ^([a-zA-Z][a-zA-Z0-9$#_]*)*$
  • Importance: high
database.service.name

Name of the database service to which to connect. In a multitenant container database, this is the service used to connect to the container database (CDB). For Oracle Real Application Clusters (RAC), use the service created by Oracle XStream.

  • Type: string
  • Valid Values: Must match the regex ^([a-zA-Z][a-zA-Z0-9$#._]*)*$
  • Importance: high
database.pdb.name

Name of the pluggable database to connect to in a multitenant architecture. The container database (CDB) name must be given via database.dbname in this case. This configuration should not be specified when connecting to a non-container database.

  • Type: string
  • Valid Values: Must match the regex ^([a-zA-Z][a-zA-Z0-9$#_]*)*$
  • Importance: high
database.out.server.name

Name of the XStream outbound server to connect to.

  • Type: string
  • Importance: high
database.tls.mode

Specifies whether to use Transport Layer Security (TLS) to connect to the Oracle database. Select one of the following options:

disable (default): Does not use a TLS connection.

one-way: Uses a TLS encrypted connection and also verifies the server’s TLS certificate against the configured Certificate Authority (CA) certificates.

  • Type: string
  • Default: disable
  • Valid Values: disable, one-way
  • Importance: medium
database.processor.licenses

Specifies the number of Oracle processor licenses required for the source database server or cluster. The is determined by multiplying the total number of processor cores by a core processor licensing factor, as specified in the Oracle Processor Core Factor Table.

  • Type: int
  • Valid Values: [1,…,4096]
  • Importance: medium

Output messages

output.key.format

Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured when using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: AVRO
  • Valid Values: AVRO, JSON_SR, PROTOBUF
  • Importance: high
output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured when using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: AVRO
  • Valid Values: AVRO, JSON_SR, PROTOBUF
  • Importance: high

How should we name your topic(s)?

topic.prefix

Topic prefix that provides a namespace for the Oracle database server or cluster from which the connector captures changes. The topic prefix should be unique across all other connectors, since it is used as a prefix for all Kafka topic names that receive events from this connector. Only alphanumeric characters, hyphens, dots and underscores are accepted.

Warning: Do not change the value of this property. If you change the value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value. The connector is also unable to recover its database schema history topic.

  • Type: string
  • Importance: high

Connector configuration

table.include.list

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you want to capture. When this property is set, the connector will only capture changes from the specified tables. Each identifier is of the form schemaName.tableName. By default, the connector captures changes from all non-system tables in each captured database. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set the table.exclude.list property.

  • Type: string
  • Importance: high
table.exclude.list

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you do not want to capture. When this property is set, the connector captures changes from any table that is not specified in the exclude list. Each identifier is of the form schemaName.tableName. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set the table.include.list property.

  • Type: string
  • Importance: high
snapshot.mode

The criteria for running a snapshot upon startup of the connector. Select one of the following snapshot options:

initial (default): The snapshot includes both the structure (schema) and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables. After the snapshot completes, the connector begins to stream event records for subsequent database changes.

no_data: The snapshot includes only the structure (schema) of captured tables. Specify this value if you want the connector to capture data only for changes that occur after the snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.

recovery: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. WARNING: Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown.

  • Type: string
  • Default: initial
  • Valid Values: initial, no_data, recovery
  • Importance: medium
schema.history.internal.skip.unparseable.ddl

A boolean value that specifies whether the connector should ignore a DDL statement that cannot be parsed or stop processing for a human to address the issue. The safe default is false which causes the connector to fail when it encounters an unparseable DDL statement. Setting the value to true should be done with care as it will cause the connector to skip processing any DDL statement it cannot parse, and this could potentially lead to schema mismatches and data loss.

  • Type: boolean
  • Default: false
  • Importance: low
snapshot.database.errors.max.retries

Specifies the number of retry attempts the connector will make to snapshot a table if a database error occurs. This configuration property currently only retries failures related to ORA-01466 error. By default, no retries are attempted.

  • Type: int
  • Default: 0
  • Valid Values: [0,…,3]
  • Importance: low
tombstones.on.delete

Controls whether a delete event is followed by a tombstone event. The following values are possible:

true: For each delete operation, the connector emits a delete event and a subsequent tombstone event.

false: For each delete operation, the connector emits only a delete event.

After a source record is deleted, a tombstone event (the default behavior) enables Kafka to completely delete all events that share the key of the deleted row in topics that have log compaction enabled.

  • Type: boolean
  • Default: true
  • Importance: medium
skipped.operations

A comma-separated list of operations to skip during streaming. You can configure the connector to skip the following types of operations: c (inserts/create), u (updates), d (deletes), t (truncates), and none to indicate nothing is skipped. The default value is t, ensuring that only truncate operations are skipped.

  • Type: string
  • Default: t
  • Importance: low
schema.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:

none (the default) does not apply any adjustment.

avro replaces the characters that cannot be used in the Avro type name with underscore.

avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.

  • Type: string
  • Default: none
  • Valid Values: avro, avro_unicode, none
  • Importance: low
field.name.adjustment.mode

Specifies how field names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:

none (the default) does not apply any adjustment.

avro replaces the characters that cannot be used in the Avro type name with underscore.

avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.

  • Type: string
  • Default: none
  • Valid Values: avro, avro_unicode, none
  • Importance: low
heartbeat.interval.ms

Controls how often the connector sends heartbeat messages to a heartbeat topic. It is useful in situations when no changes occur in the captured tables for an extended period. In such cases, there are no change event messages generated, causing the committed source offset to remain unchanged. As a result, the connector is unable to update the processed low watermark on the outbound server which could result in the database retaining archived redo log files longer than needed. The default value is 0 which disables the heartbeat mechanism.

  • Type: int
  • Default: 0
  • Valid Values: [0,…]
  • Importance: medium
database.os.timezone

Specifies the database server’s operating system timezone. This is used to read the time when the LCR was generated at the source database. The default timezone is UTC. The value has to be a valid java.time.ZoneId identifier.

  • Type: string
  • Default: UTC
  • Importance: low

How should we handle data types?

decimal.handling.mode

Specifies how the connector should handle NUMBER, DECIMAL and NUMERIC columns. You can set one of the following options:

precise (the default): Uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect’s org.apache.kafka.connect.data.Decimal type. Depending on the precision and scale, the most appropriate Kafka Connect integer type is used for integral values, ensuring that the value is represented without any loss of precision.

string: Encodes values as formatted strings. Using the string option is easier to consume, but results in a loss of semantic information about the real type.

double: Represents values using Java’s double. Using double values is easier, but can result in a loss of precision.

  • Type: string
  • Default: precise
  • Valid Values: double, precise, string
  • Importance: medium
time.precision.mode

Specifies how the connector should handle time, date, and timestamp columns. You can set one of the following options:

adaptive (the default): Bases the precision of time, date, and timestamp values on the database column’s precision.

connect: always represents time, date, and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns’ precision.

  • Type: string
  • Default: adaptive
  • Valid Values: adaptive, connect
  • Importance: medium

Number of tasks for this connector

tasks.max

Specifies the maximum number of tasks for the connector. Since this connector supports only a single task, the maximum is capped at 1.

  • Type: int
  • Default: 1
  • Valid Values: [1,…,1]
  • Importance: high

Auto-restart policy

auto.restart.on.user.error

Enable connector to automatically restart on user-actionable errors.

  • Type: boolean
  • Default: true
  • Importance: medium

Additional Configs

header.converter

The converter class for the headers. This is used to serialize and deserialize the headers of the messages.

  • Type: string
  • Importance: low
producer.override.compression.type

The compression type for all data generated by the producer.

  • Type: string
  • Importance: low
value.converter.allow.optional.map.keys

Allow optional string map key when converting from Connect Schema to Avro Schema. Applicable for Avro Converters.

  • Type: boolean
  • Importance: low
value.converter.auto.register.schemas

Specify if the Serializer should attempt to register the Schema.

  • Type: boolean
  • Importance: low
value.converter.connect.meta.data

Allow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.

  • Type: boolean
  • Importance: low
value.converter.enhanced.avro.schema.support

Enable enhanced schema support to preserve package information and Enums. Applicable for Avro Converters.

  • Type: boolean
  • Importance: low
value.converter.enhanced.protobuf.schema.support

Enable enhanced schema support to preserve package information. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.flatten.unions

Whether to flatten unions (oneofs). Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.generate.index.for.unions

Whether to generate an index suffix for unions. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.generate.struct.for.nulls

Whether to generate a struct variable for null values. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.int.for.enums

Whether to represent enums as integers. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.latest.compatibility.strict

Verify latest subject version is backward compatible when use.latest.version is true.

  • Type: boolean
  • Importance: low
value.converter.object.additional.properties

Whether to allow additional properties for object schemas. Applicable for JSON_SR Converters.

  • Type: boolean
  • Importance: low
value.converter.optional.for.nullables

Whether nullable fields should be specified with an optional label. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.optional.for.proto2

Whether proto2 optionals are supported. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.use.latest.version

Use latest version of schema in subject for serialization when auto.register.schemas is false.

  • Type: boolean
  • Importance: low
value.converter.use.optional.for.nonrequired

Whether to set non-required properties to be optional. Applicable for JSON_SR Converters.

  • Type: boolean
  • Importance: low
value.converter.wrapper.for.nullables

Whether nullable fields should use primitive wrapper messages. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.wrapper.for.raw.primitives

Whether a wrapper message should be interpreted as a raw primitive at root level. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
key.converter.key.subject.name.strategy

How to construct the subject name for key schema registration.

  • Type: string
  • Default: TopicNameStrategy
  • Importance: low
value.converter.decimal.format

Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:

BASE64 to serialize DECIMAL logical types as base64 encoded binary data and

NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.

  • Type: string
  • Default: BASE64
  • Importance: low
value.converter.flatten.singleton.unions

Whether to flatten singleton unions. Applicable for Avro and JSON_SR Converters.

  • Type: boolean
  • Default: false
  • Importance: low
value.converter.reference.subject.name.strategy

Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string
  • Default: DefaultReferenceSubjectNameStrategy
  • Importance: low
value.converter.value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Type: string
  • Default: TopicNameStrategy
  • Importance: low

Connect to an Oracle Real Application Cluster (RAC) Database

Confluent recommends configuring the following properties to ensure that the connector will be able to connect and attach to the specific RAC instance running the XStream components:

  • Configure the database.hostname property to the Oracle RAC database SCAN address.

    Note

    If a SCAN address is unavailable, configure the database.hostname property to the hostname of the instance where the XStream components are running. You will need to manually reconfigure the connector whenever the instance running the XStream components changes.

  • Configure the database.service.name property to the auto-created Oracle XStream service.

Supported Data Types

The connector creates change events for database changes. Each change event mirrors the table’s schema, with a field for every column value. The data type of each table column determines how the connector represents the column values in the corresponding change event fields.

For certain data types, such as numeric data types, you can customize how the connector maps them by modifying the default configuration settings. This allows more control over handling various data types, ensuring that the change events reflect the desired format and meet specific requirements.

Character data types

The following table describes how the connector maps character types.

Oracle data type Connect type
CHAR STRING
VARCHAR / VARCHAR2 STRING
NCHAR STRING
NVARCHAR STRING

In all cases, the connector ensures that character data is converted to a string type in Kafka Connect when creating change events.

Numeric data types

You can adjust how the connector maps numeric data types by changing the decimal.handling.mode configuration property.

The table below shows the mapping of numeric types when decimal.handling.mode is set to precise.

Oracle data type Connect type Notes
NUMBER(P, S <= 0) INT8 / INT16 / INT32 / INT64 / BYTES

Based on the precision and scale, the connector selects a matching Kafka Connect integer type:

  • If the precision minus the scale (P - S) is less than 3, it uses INT8.
  • If P - S is less than 5, it uses INT16.
  • If P - S is less than 10, it uses INT32.
  • If P - S is less than 19, it uses INT64.
  • If P - S is 19 or greater, it uses BYTES (org.apache.kafka.connect.data.Decimal).

NUMBER columns with a scale of 0 represent integer numbers. A negative scale indicates rounding in Oracle, for example, a scale of -2 causes rounding to hundreds.

NUMBER(P, S > 0) BYTES org.apache.kafka.connect.data.Decimal
NUMBER(P, [, * ]) STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

SMALLINT, INT, INTEGER BYTES

org.apache.kafka.connect.data.Decimal

Oracle maps SMALLINT, INT and INTEGER to NUMBER(38,0). As a result, these types can hold values that exceed the maximum range of any of the INT types.

NUMERIC, DECIMAL INT8 / INT16 / INT32 / INT64 / BYTES Handles in the same way as the NUMBER data type (note that scale defaults to 0 for NUMERIC).

FLOAT[(P)]

Maps to FLOAT(126) when P not mentioned

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

REAL - Maps to FLOAT(63)

DOUBLE PRECISION - Maps to FLOAT(126)

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

BINARY_FLOAT FLOAT32  
BINARY_DOUBLE FLOAT64  

Note

When decimal.handling.mode is set to:

  • string: The Oracle numeric data types are mapped to the Kafka Connect STRING type.
  • double: The Oracle numeric data types are mapped to the Kafka Connect FLOAT64 type.

Temporal data types

You can adjust how the connector maps some of the temporal data types by changing the time.precision.mode configuration property.

The table below shows the mapping of temporal types:

Oracle data type Connect type Notes
DATE INT64

Based on time.precision.mode:

  • adaptive: io.debezium.time.Timestamp
  • connect: org.apache.kafka.connect.data.Timestamp
TIMESTAMP[(P)] INT64

Based on time.precision.mode:

adaptive:

If precision <= 3: io.debezium.time.Timestamp

  • Represents the number of milliseconds since the UNIX epoch, without timezone information.

Else if precision <= 6: io.debezium.time.MicroTimestamp

  • Represents the number of microseconds since the UNIX epoch, without timezone information.

Else: io.debezium.time.NanoTimestamp

  • Represents the number of nanoseconds since the UNIX epoch, without timezone information.

connect:

org.apache.kafka.connect.data.Timestamp

Represents the number of milliseconds since the UNIX epoch, without timezone information.

TIMESTAMP WITH TIMEZONE STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp with timezone information.

TIMESTAMP WITH LOCAL TIMEZONE STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp in UTC.

INTERVAL YEAR[(P)] TO MONTH STRING

io.debezium.time.Interval

A string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S.

INTERVAL DAY[(P)] TO SECOND[(FP)] STRING

io.debezium.time.Interval

A string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S.

Note

When time.precision.mode is set to connect, there could be a loss of precision if the fractional second precision of a column exceeds 3, because Oracle supports a higher level of precision than the logical types in Kafka Connect.

Security

Native Network Encryption

Oracle database provides native network encryption and integrity to ensure data is secure during transit, without the need for setting up Transport Layer Security (TLS).

Encryption and integrity are managed based on a combination of client-side and server-side encryption settings, which can be configured using parameters in the sqlnet.ora configuration file. For more information on configuring Oracle Advanced Security for network encryption and integrity, see Support for Network Encryption and Integrity section in the Oracle Database JDBC Developer’s Guide.

The connector uses the Oracle JDBC OCI driver to communicate with the Oracle database. You can use the database’s native network encryption and data integrity to securely transmit data between the connector and the Oracle database. For more information on relevant configuration settings, see Table 9-2 OCI Driver Client Parameters for Encryption and Integrity in the Oracle Database JDBC Developer’s Guide.

The following configurations are set on the connector:

  • SQLNET.ALLOW_WEAK_CRYPTO parameter is set to FALSE to ensure that the connector uses strong algorithms when communicating with the Oracle database.
  • SQLNET.ENCRYPTION_CLIENT and SQLNET.CRYPTO_CHECKSUM_CLIENT parameters are set to ACCEPTED (the default value).

To enable network encryption and integrity, configure the SQLNET.ENCRYPTION_SERVER and SQLNET.CRYPTO_CHECKSUM_SERVER parameters on the server to either REQUESTED or REQUIRED. Additionally, specify strong encryption and crypto-checksum algorithms by setting the SQLNET.ENCRYPTION_TYPES_SERVER and SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER parameters. For more information, see Improving Native Network Encryption Security section in the Oracle Database Security Guide.

Transport Layer Security (TLS)

You can configure Transport Layer Security (TLS) to secure connections between the client (connector) and the Oracle database. Currently, only one-way TLS, without client wallets, is supported.

In one-way TLS, the database server presents a certificate to authenticate itself to the client (connector). The client needs access to the trusted Certificate Authority (CA) root certificate that signed the server’s certificate to verify it. Currently, the connector only supports certificates signed by well-known CAs, where the corresponding CA certificate is present in the default certificate store of the system running the connector.

For more information on configuring TLS, see Configuring Transport Layer Security Authentication chapter of the Oracle Database Security Guide.

You can enable TLS connections between the connector and the Oracle database server by using the database.tls.mode configuration property. Set database.tls.mode to one-way to enable TLS encryption and server authentication.

Note

When database.tls.mode is set to one-way, ensure that the port specified in database.port corresponds to the listener on the server that supports TLS connections.

Next Steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../../_images/topology.png