Configuration Reference for Oracle XStream CDC Source Connector for Confluent Platform

The sections in this page define each of the Oracle XStream CDC Source connector configuration properties.

Note

These are properties for the self-managed connector. If you are using Confluent Cloud, see Oracle XStream CDC Source Connector for Confluent Cloud

Connector

name

Unique name for the connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high
connector.class

Java class name for the connector. It is always set to io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector.

  • Type: string
  • Default: NA
  • Importance: high
tasks.max

The maximum number of tasks for this connector. This configuration does not apply to the Oracle connector because it always operates with a single task.

  • Type: int
  • Default: 1
  • Valid Values: [1,…,1]
  • Importance: high

Database

database.hostname

IP address or hostname of the Oracle database server.

  • Type: string
  • Valid Values: Must match the regex ^[a-zA-Z0-9-_.]+$
  • Importance: high
database.port

Port number of the Oracle database server.

  • Type: int
  • Default: 1521
  • Valid Values: [1,…,65535]
  • Importance: high
database.user

Name of the Oracle database user to use when connecting to the database.

  • Type: string
  • Importance: high
database.password

Password of the Oracle database user to use when connecting to the database.

  • Type: password
  • Importance: high
database.dbname

Name of the database to connect to. In a multitenant architecture, this is the name of the container database (CDB).

  • Type: string
  • Importance: high
database.service.name

Name of the database service to which to connect. In a multitenant container database, this is the service used to connect to the CDB. For Oracle Real Application Clusters (RACs), use the service created by Oracle XStream.

  • Type: string
  • Importance: high
database.pdb.name

Name of the pluggable database to connect to in a multitenant architecture. The container database (CDB) name must be given via database.dbname in this case. This configuration should not be specified when connecting to a non-container database.

  • Type: string
  • Importance: high
database.out.server.name

Name of the XStream outbound server to connect to.

  • Type: string
  • Importance: high
database.os.timezone

Specifies the database server’s operating system timezone. This is used to read the time when the LCR was generated at the source database. The default timezone is UTC. The value has to be a valid java.time.ZoneId identifier.

  • Type: string
  • Default: UTC
  • Importance: low
database.processor.licenses
Specifies the number of Oracle processor licenses required for the source database server or cluster. The is determined by multiplying the total number of processor cores by a core processor licensing factor, as specified in the Oracle Processor Core Factor Table.
  • Type: int
  • Default: NA
  • Importance: Medium
database.tls.mode

Specifies whether to use Transport Layer Security (TLS) to connect to the Oracle database.

  • Type: enum
  • Default: disable

You can set one of the following values:

  • disable (default): Does not use a TLS connection.
  • one-way: Uses a TLS encrypted connection and also verifies the server’s TLS certificate against the configured Certificate Authority (CA) certificates.

Topic name

topic.prefix

Topic prefix that provides a namespace for the Oracle database server or cluster from which the connector captures changes. The topic prefix should be unique across all other connectors, since it is used as a prefix for all Kafka topic names that receive events from this connector. Only alphanumeric characters, hyphens, dots and underscores are accepted.

  • Type: string
  • Importance: high

Warning

Do not change the value of this property. If you change the value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value. The connector is also unable to recover its database schema history topic.

Connector configuration

table.include.list

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you want to capture. When this property is set, the connector will only capture changes from the specified tables. Each identifier is of the form schemaName.tableName. By default, the connector captures changes from all non-system tables in each captured database. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set the table.exclude.list property.

  • Type: string
  • Importance: high
table.exclude.list

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you do not want to capture. When this property is set, the connector captures changes from any table that is not specified in the exclude list. Each identifier is of the form schemaName.tableName. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set the table.include.list property.

  • Type: string
  • Importance: medium
snapshot.mode

The criteria for running a snapshot upon startup of the connector.

  • Type: enum
  • Default: initial
  • Valid values: initial, no_data and recovery
  • Importance: medium

You can set one of the following values:

  • initial (default): The snapshot includes both the structure (schema) and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
  • no_data: The snapshot includes only the structure (schema) of captured tables. Specify this value if you want the connector to capture data only for changes that occur after the snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
  • recovery: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth.

Warning

Do not use recovery mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown.

snapshot.fetch.size

The number of rows to provide as a hint to the JDBC driver that should be fetched from the database when more rows are needed when taking a snapshot. A value of ‘0’ uses the default JDBC fetch size.

  • Type: int
  • Default: 10,000
  • Importance: medium
snapshot.max.threads

Specifies the number of threads the connector uses during an initial snapshot. To enable parallel processing of the initial snapshot, set this property to a value greater than 1. With parallel snapshots, the connector processes multiple tables at the same time.

  • Type: int
  • Default: 1
  • Importance: medium
snapshot.database.errors.max.retries

Specifies the number of retry attempts the connector will make to snapshot a table if a database error occurs. This configuration property currently only retries failures related to ORA-01466 error. By default, no retries are attempted.

  • Type: int
  • Default: 0
  • Valid Values: [0,…,3]
  • Importance: Low
decimal.handling.mode

Specifies how the connector should handle NUMBER, DECIMAL and NUMERIC columns.

  • Type: string
  • Default: precise
  • Valid Values: double, precise, string
  • Importance: medium

You can set one of the following values:

  • precise (default): Uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect’s org.apache.kafka.connect.data.Decimal type. Depending on the precision and scale, the most appropriate Kafka Connect integer type is used for integral values, ensuring that the value is represented without any loss of precision.
  • string: Encodes values as formatted strings. Using the string option is easier to consume, but results in a loss of semantic information about the real type.
  • double: Represents values using Java’s double. Using double values is easier, but can result in a loss of precision.
time.precision.mode

Specifies how the connector should handle time, date, and timestamps columns.

  • Type: string
  • Default: adaptive
  • Valid Values: adaptive, connect
  • Importance: medium

You can set one of the following options:

  • adaptive (default): Bases the precision of time, date, and timestamp values on the database column’s precision.
  • connect: always represents time, date, and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns’ precision.
query.fetch.size

The number of rows to provide as a hint to the JDBC driver that should be fetched from the database when more rows are needed. A value of ‘0’ uses the default JDBC fetch size.

  • Type: int
  • Default: 10,000
  • Importance: medium
tombstones.on.delete

Controls whether a delete event is followed by a tombstone event. After a source record is deleted, a tombstone event (the default behavior) enables Kafka to completely delete all events that share the key of the deleted row in topics that have log compaction enabled.

  • Type: boolean
  • Default: true
  • Importance: medium

The following values are possible:

  • true: For each delete operation, the connector emits a delete event and a subsequent tombstone event.
  • false: For each delete operation, the connector emits only a delete event.
skipped.operations

A comma-separated list of the operation types to skip during streaming.

  • Type: string
  • Default: t
  • Importance: low

You can configure the connector to skip the following types of operations:

  • c (create/insert)
  • u (update)
  • d (delete)
  • t (truncate)

You use none to indicate that no operations are skipped. By default, only truncate (t) operations are skipped.

schema.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector

  • Type: string
  • Default: none
  • Valid Values: avro, avro_unicode, none
  • Importance: low

The following values are possible:

  • none (default): does not apply any adjustment.
  • avro: replaces the characters that cannot be used in the Avro type name with underscore.
  • avro_unicode: replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
field.name.adjustment.mode

Specifies how field names should be adjusted for compatibility with the message converter used by the connector.

  • Type: string
  • Default: none
  • Valid Values: avro, avro_unicode, none
  • Importance: low

The following values are possible:

  • none (the default): does not apply any adjustment.
  • avro: replaces the characters that cannot be used in the Avro type name with underscore.
  • avro_unicode: replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx.

Note

_ is an escape sequence like backslash in Java.

schema.history.internal.kafka.topic

The name of the topic for the database schema history. A new topic with provided name is created if it doesn’t already exist. If the topic already exists, ensure that is it has a single partition, infinite retention period, and is not in use by any other connector.

  • Type: string
  • Default: NA
  • Importance: high
schema.history.internal.kafka.bootstrap.servers

A list of host/port pairs that the connector will use for establishing the initial connection to the Kafka cluster for retrieving database schema history previously stored by the connector. This should point to the same Kafka cluster used by the Kafka Connect process.

  • Type: string
  • Default: NA
  • Importance: high
schema.history.internal.skip.unparseable.ddl

A boolean value that specifies whether the connector should ignore a DDL statement that cannot be parsed or stop processing for a human to address the issue. The safe default is false which causes the connector to fail when it encounters an unparseable DDL statement. Setting the value to true should be done with care as it will cause the connector to skip processing any DDL statement it cannot parse, and this could potentially lead to schema mismatches and data loss.

  • Type: boolean
  • Default: false
  • Importance: low
topic.heartbeat.prefix

Controls the name of the topic to which the connector sends heartbeat messages. The heartbeat topic name follows this format: ${topic.heartbeat.prefix}.{topic.prefix}.

  • Type: string
  • Default: __cflt-oracle-heartbeat
  • Importance: low
heartbeat.interval.ms

Controls how often the connector sends heartbeat messages to a heartbeat topic. It is useful in situations when no changes occur in the captured tables for an extended period. In such cases, there are no change event messages generated, causing the committed source offset to remain unchanged. As a result, the connector is unable to update the processed low watermark on the outbound server which could result in the database retaining archived redo log files longer than needed. The default value is 0 which disables the heartbeat mechanism.

  • Type: int
  • Default: 0
  • Valid Values: [0,…]
  • Importance: medium