Configuration Reference for Oracle XStream CDC Source Connector for Confluent Platform¶
The sections in this page define each of the Oracle XStream CDC Source connector configuration properties.
Note
These are properties for the self-managed connector. If you are using Confluent Cloud, see Oracle XStream CDC Source Connector for Confluent Cloud
Connector¶
name
Unique name for the connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
connector.class
Java class name for the connector. It is always set to
io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector
.- Type: string
- Default: NA
- Importance: high
tasks.max
The maximum number of tasks for this connector. This configuration does not apply to the Oracle connector because it always operates with a single task.
- Type: int
- Default: 1
- Valid Values: [1,…,1]
- Importance: high
Database¶
database.hostname
IP address or hostname of the Oracle database server.
- Type: string
- Valid Values: Must match the regex
^[a-zA-Z0-9-_.]+$
- Importance: high
database.port
Port number of the Oracle database server.
- Type: int
- Default: 1521
- Valid Values: [1,…,65535]
- Importance: high
database.user
Name of the Oracle database user to use when connecting to the database.
- Type: string
- Importance: high
database.password
Password of the Oracle database user to use when connecting to the database.
- Type: password
- Importance: high
database.dbname
Name of the database to connect to. In a multitenant architecture, this is the name of the container database (CDB).
- Type: string
- Importance: high
database.service.name
Name of the database service to which to connect. In a multitenant container database, this is the service used to connect to the CDB. For Oracle Real Application Clusters (RACs), use the service created by Oracle XStream.
- Type: string
- Importance: high
database.pdb.name
Name of the pluggable database to connect to in a multitenant architecture. The container database (CDB) name must be given via
database.dbname
in this case. This configuration should not be specified when connecting to a non-container database.- Type: string
- Importance: high
database.out.server.name
Name of the XStream outbound server to connect to.
- Type: string
- Importance: high
database.os.timezone
Specifies the database server’s operating system timezone. This is used to read the time when the LCR was generated at the source database. The default timezone is UTC. The value has to be a valid
java.time.ZoneId
identifier.- Type: string
- Default: UTC
- Importance: low
database.processor.licenses
- Specifies the number of Oracle processor licenses required for the source database server or cluster. The is determined by multiplying the total number of processor cores by a core processor licensing factor, as specified in the Oracle Processor Core Factor Table.
- Type: int
- Default: NA
- Importance: Medium
database.tls.mode
Specifies whether to use Transport Layer Security (TLS) to connect to the Oracle database.
- Type: enum
- Default: disable
You can set one of the following values:
- disable (default): Does not use a TLS connection.
- one-way: Uses a TLS encrypted connection and also verifies the server’s TLS certificate against the configured Certificate Authority (CA) certificates.
Topic name¶
topic.prefix
Topic prefix that provides a namespace for the Oracle database server or cluster from which the connector captures changes. The topic prefix should be unique across all other connectors, since it is used as a prefix for all Kafka topic names that receive events from this connector. Only alphanumeric characters, hyphens, dots and underscores are accepted.
- Type: string
- Importance: high
Warning
Do not change the value of this property. If you change the value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value. The connector is also unable to recover its database schema history topic.
Connector configuration¶
table.include.list
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you want to capture. When this property is set, the connector will only capture changes from the specified tables. Each identifier is of the form
schemaName.tableName
. By default, the connector captures changes from all non-system tables in each captured database. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set thetable.exclude.list
property.- Type: string
- Importance: high
table.exclude.list
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you do not want to capture. When this property is set, the connector captures changes from any table that is not specified in the exclude list. Each identifier is of the form
schemaName.tableName
. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set thetable.include.list
property.- Type: string
- Importance: medium
snapshot.mode
The criteria for running a snapshot upon startup of the connector.
- Type: enum
- Default:
initial
- Valid values:
initial
,no_data
andrecovery
- Importance: medium
You can set one of the following values:
- initial (default): The snapshot includes both the structure (schema) and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
- no_data: The snapshot includes only the structure (schema) of captured tables. Specify this value if you want the connector to capture data only for changes that occur after the snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
- recovery: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth.
Warning
Do not use recovery mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown.
snapshot.fetch.size
The number of rows to provide as a hint to the JDBC driver that should be fetched from the database when more rows are needed when taking a snapshot. A value of ‘0’ uses the default JDBC fetch size.
- Type: int
- Default: 10,000
- Importance: medium
snapshot.max.threads
Specifies the number of threads the connector uses during an initial snapshot. To enable parallel processing of the initial snapshot, set this property to a value greater than 1. With parallel snapshots, the connector processes multiple tables at the same time.
- Type: int
- Default: 1
- Importance: medium
snapshot.database.errors.max.retries
Specifies the number of retry attempts the connector will make to snapshot a table if a database error occurs. This configuration property currently only retries failures related to
ORA-01466
error. By default, no retries are attempted.- Type: int
- Default: 0
- Valid Values: [0,…,3]
- Importance: Low
decimal.handling.mode
Specifies how the connector should handle
NUMBER
,DECIMAL
andNUMERIC
columns.- Type: string
- Default: precise
- Valid Values: double, precise, string
- Importance: medium
You can set one of the following values:
- precise (default): Uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect’s org.apache.kafka.connect.data.Decimal type. Depending on the precision and scale, the most appropriate Kafka Connect integer type is used for integral values, ensuring that the value is represented without any loss of precision.
- string: Encodes values as formatted strings. Using the string option is easier to consume, but results in a loss of semantic information about the real type.
- double: Represents values using Java’s double. Using double values is easier, but can result in a loss of precision.
time.precision.mode
Specifies how the connector should handle time, date, and timestamps columns.
- Type: string
- Default: adaptive
- Valid Values: adaptive, connect
- Importance: medium
You can set one of the following options:
- adaptive (default): Bases the precision of time, date, and timestamp values on the database column’s precision.
- connect: always represents time, date, and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns’ precision.
query.fetch.size
The number of rows to provide as a hint to the JDBC driver that should be fetched from the database when more rows are needed. A value of ‘0’ uses the default JDBC fetch size.
- Type: int
- Default: 10,000
- Importance: medium
tombstones.on.delete
Controls whether a delete event is followed by a tombstone event. After a source record is deleted, a tombstone event (the default behavior) enables Kafka to completely delete all events that share the key of the deleted row in topics that have log compaction enabled.
- Type: boolean
- Default: true
- Importance: medium
The following values are possible:
- true: For each delete operation, the connector emits a delete event and a subsequent tombstone event.
- false: For each delete operation, the connector emits only a delete event.
skipped.operations
A comma-separated list of the operation types to skip during streaming.
- Type: string
- Default: t
- Importance: low
You can configure the connector to skip the following types of operations:
- c (create/insert)
- u (update)
- d (delete)
- t (truncate)
You use
none
to indicate that no operations are skipped. By default, only truncate (t
) operations are skipped.schema.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector
- Type: string
- Default: none
- Valid Values: avro, avro_unicode, none
- Importance: low
The following values are possible:
- none (default): does not apply any adjustment.
- avro: replaces the characters that cannot be used in the Avro type name with underscore.
- avro_unicode: replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
field.name.adjustment.mode
Specifies how field names should be adjusted for compatibility with the message converter used by the connector.
- Type: string
- Default: none
- Valid Values: avro, avro_unicode, none
- Importance: low
The following values are possible:
- none (the default): does not apply any adjustment.
- avro: replaces the characters that cannot be used in the Avro type name with underscore.
- avro_unicode: replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx.
Note
_ is an escape sequence like backslash in Java.
schema.history.internal.kafka.topic
The name of the topic for the database schema history. A new topic with provided name is created if it doesn’t already exist. If the topic already exists, ensure that is it has a single partition, infinite retention period, and is not in use by any other connector.
- Type: string
- Default: NA
- Importance: high
schema.history.internal.kafka.bootstrap.servers
A list of host/port pairs that the connector will use for establishing the initial connection to the Kafka cluster for retrieving database schema history previously stored by the connector. This should point to the same Kafka cluster used by the Kafka Connect process.
- Type: string
- Default: NA
- Importance: high
schema.history.internal.skip.unparseable.ddl
A boolean value that specifies whether the connector should ignore a DDL statement that cannot be parsed or stop processing for a human to address the issue. The safe default is
false
which causes the connector to fail when it encounters an unparseable DDL statement. Setting the value totrue
should be done with care as it will cause the connector to skip processing any DDL statement it cannot parse, and this could potentially lead to schema mismatches and data loss.- Type: boolean
- Default: false
- Importance: low
topic.heartbeat.prefix
Controls the name of the topic to which the connector sends heartbeat messages. The heartbeat topic name follows this format:
${topic.heartbeat.prefix}.{topic.prefix}
.- Type: string
- Default:
__cflt-oracle-heartbeat
- Importance: low
heartbeat.interval.ms
Controls how often the connector sends heartbeat messages to a heartbeat topic. It is useful in situations when no changes occur in the captured tables for an extended period. In such cases, there are no change event messages generated, causing the committed source offset to remain unchanged. As a result, the connector is unable to update the processed low watermark on the outbound server which could result in the database retaining archived redo log files longer than needed. The default value is 0 which disables the heartbeat mechanism.
- Type: int
- Default: 0
- Valid Values: [0,…]
- Importance: medium