Configuration Reference for Oracle CDC Source Connector for Confluent Platform

The sections in this page define each of the Oracle CDC Source connector configuration properties and groups them by their level of importance.

Note

These are properties for the self-managed connector. If you are using Confluent Cloud, see Oracle CDC Source Connector for Confluent Cloud.

High importance

oracle.server

The host name or address for the Oracle server.

  • Type: string
  • Valid Values: Valid IPv4/IPv6 address or host name
  • Importance: high
oracle.port

The port number used to connect to Oracle.

  • Type: int
  • Default: 1521
  • Valid Values: 1 to 65535 (inclusive)
  • Importance: high
oracle.username

The name of the Oracle database user.

  • Type: string
  • Importance: high
oracle.password

The password for the Oracle database user.

  • Type: string
  • Importance: high
oracle.sid

The Oracle system identifier (SID) of a multitenant container database (CDB) or non-multitenant database where tables reside. Confluent recommends you use oracle.service.name to connect to the database using service names instead of using the SID. Maps to the SID parameter in the connect descriptor.

  • Type: string
  • Valid Values: Non-empty string
  • Importance: high
oracle.pdb.name

The name of the pluggable database (PDB). This is not required in the case where tables reside in the CDB$ROOT database, or if you’re using a non-container database.

  • Type: string
  • Valid Values: Non-empty string
  • Importance: high
oracle.service.name

The Oracle service name. If set, the connector always connects to the database using the provided service name. The oracle.service.name maps to the SERVICE_NAME parameter in the connect descriptor. For the multitenant container database (CDB) or non-multitenant database, this does not need to be specified.

Important

  • Confluent recommends you set the oracle.service.name to the container database (CDB) service name when using a pluggable database (PDB).
  • When this property is set, it is used in the connect descriptor instead of oracle.sid.
  • Type: string
  • Default: “”
  • Importance: high
key.converter

Converter class used to serialize and format all record keys written to Kafka and deserialize all record keys read from Kafka. Examples of common formats include JSON, Avro, and Protobuf. The converter may require more key.converter.* properties. For more details, see Configuring Key and Value Converters.

value.converter

Converter class used to serialize and format all record values written to Kafka and deserialize all record values read from Kafka. Examples of common formats include JSON, Avro, and Protobuf. A valid converter must be specified in the connector configuration to capture change events to the table change event topics. The converter may require more value.converter.* properties. For more details, see Configuring Key and Value Converters.

table.inclusion.regex

The regular expression that matches the fully-qualified table names. The values are matched (case sensitive) with the object names stored in the data dictionary (The value is uppercase unless created as a quoted identifier. Database and PDB names are always stored as uppercase in the data dictionary). Ensure casing is consistent for the SID portion in the identifier with the oracle.sid value specified—that is, if the SID is lowercase on the database side, the SID section of the regex must also be in lowercase. The same principal applies for uppercase values. For example, if the SID is orclcdb, then the regex should be orclcdb.[.]C##MYUSER[.]table1, and if the SID is ORCLCDB, the regex should be ORCLCDB.[.]C##MYUSER[.]table1.

  • For non-container database, the fully-qualified name includes the SID and schema name (for example, ORCLDB[.]MYUSER[.](ORDERS|CUSTOMERS) or ORCLDB\\.MYUSER\\.(ORDERS|CUSTOMERS)).
  • For multitenant database (CDB), the fully-qualified name includes the SID and schema name (for example, ORCLCDB[.]C##MYUSER[.](ORDERS|CUSTOMERS) or ORCLCDB\\.C##MYUSER\\.(ORDERS|CUSTOMERS)).
  • For multitenant database (PDB), the fully-qualified name includes the PDB and schema name (for example,``ORCLPDB1[.]C##MYUSER[.](ORDERS|CUSTOMERS)`` or ORCLPDB1\\.C##MYUSER\\.(ORDERS|CUSTOMERS)).

For more details on this refer Regex Pattern Issues

  • Type: string
  • Valid Values: A valid regular expression or a blank string when the table.topic.name.template is blank.
  • Importance: high
table.exclusion.regex

The regular expression that matches the fully-qualified table names. The values are matched (case sensitive) with the object names stored in the data dictionary (The value is uppercase unless created as a quoted identifier. Database and PDB names are always stored as uppercase in the data dictionary). Ensure casing is consistent for the SID portion in the identifier with the oracle.sid value specified—that is, if the SID is lowercase on the database side, the SID section of the regex must also be in lowercase. The same principal applies for uppercase values. For example, if the SID is orclcdb, then the regex should be orclcdb.[.]C##MYUSER[.]table1, and if the SID is ORCLCDB, the regex should be ORCLCDB.[.]C##MYUSER[.]table1

  • For non-container database, the fully-qualified name includes the SID and schema name (for example, ORCLDB[.]MYUSER[.](ORDERS|CUSTOMERS) or ORCLDB\\.MYUSER\\.(ORDERS|CUSTOMERS)).
  • For multitenant database (CDB), the fully-qualified name includes the SID and schema name (for example, ORCLCDB[.]C##MYUSER[.](ORDERS|CUSTOMERS) or ORCLCDB\\.C##MYUSER\\.(ORDERS|CUSTOMERS)).
  • For multitenant database (PDB), the fully-qualified name includes the PDB and schema name (for example, ORCLPDB1[.]C##MYUSER[.](ORDERS|CUSTOMERS) or ORCLPDB1\\.C##MYUSER\\.(ORDERS|CUSTOMERS)).

For more details on this refer Regex Pattern Issues

  • Type: string
  • Default: blank
  • Valid Values: A blank string or a valid regular expression.
  • Importance: high
table.topic.name.template

The template that defines the name of the Kafka topic where the change event is written. The value can be a constant if the connector writes all change events from all captured tables to one topic. This can be left blank only if the connector has to write events only to the redo log topic and not to the table change event topics. Special characters, including \, $, {, and } must be escaped with \ when not intended to be part of a template variable.

  • Type: string
  • Default: ${fullyQualifiedTableName}
  • Valid Values: Template variables which resolve to a valid topic name or a blank string. Valid topic names consist of 1 to 249 alphanumeric, +, ., _, \, and - characters.
  • Importance: high
redo.log.topic.name

The template for the name of the Kafka topic where the connector records all redo log events.

  • Type: string
  • Default: ${connectorName}-${databaseName}-redo-log
  • Valid Values: Template variables which resolve to a valid topic name. Valid topic names consist of 1 to 249 alphanumeric, +, ., _, \, and - characters.
  • Importance: high
redo.log.consumer.bootstrap.servers

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster with the redo log topic. This list is only used to connect to the initial hosts to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... This list does not need to contain the full set of servers. You should enter more than one host/port pair in case a server is offline. A valid list of bootstrap servers must be specified for the redo log consumers to read events from the redo log topic and write change events to the table change event topics. This is not required if table.topic.name.template is set to the empty string.

  • Type: string
  • Valid Values: Kafka cluster server host/port pairs
  • Importance: high

Medium importance

start.from

When starting for the first time, this is the position in the redo log where the connector should start. Specifies an Oracle System Change Number (SCN) or a database timestamp with format yyyy-MM-dd HH:mm:SS in the database time zone.

  • Type: string
  • Default: snapshot
  • Valid values: snapshot, current and force_current
    • snapshot: This instructs the connector to perform an initial snapshot of each captured table prior to capturing changes.
    • current: This may be used to instruct the connector to start from the current Oracle SCN without snapshotting.
    • force_current: This is the same as current but it will ignore any previously stored offsets when the connector is restarted. This option should be used with caution as it can result in losing changes between the SCN stored in the offsets and the current SCN. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs. Every option other than force_current causes the connector to resume from the stored offsets in case of task restarts or re-balances.
  • Importance: medium
key.template

The template that defines the Kafka record key for each change event. By default the record key contains a concatenated primary key value delimited by an underscore (_) character. Use ${primaryKeyStructOrValue} to contain either the sole column value of a single-column primary key or a STRUCT with the multi-column primary key fields (or null if the table has no primary or unique key). Use ${primaryKeyStruct} to always use a STRUCT for primary keys that have one or more columns (or null if there is no primary or unique key). If the template contains variables or string literals, the record key is the string with the variables resolved and replaced.

  • Type: string
  • Default: ${primaryKeyStructOrValue}
  • Importance: medium
max.batch.size

The maximum number of record to be returned in a single batch from the connector to Kafka Connect. The connector may return fewer records if no additional records are available.

  • Type: int
  • Default: 1000
  • Importance: medium
query.timeout.ms

The timeout in milliseconds (ms) for any query submitted to Oracle. The default is 5 minutes (or 300000 milliseconds). If set to negative values, the connector will not enforce timeout on queries.

  • Type: long
  • Default: 300000
  • Importance: medium
max.retry.time.ms

The maximum total time in milliseconds (ms) the connector tries to reconnect after receiving a retriable exception. A non-positive value disables retries. The default is 86400000 ms (24 hours).

  • Type: long
  • Default: 86400000
  • Importance: medium
redo.log.poll.interval.ms

The interval between polls to retrieve the database logs. This has no effect when using Oracle database versions prior to 19c.

  • Type: int
  • Default: 1000
  • Importance: medium
snapshot.row.fetch.size

The number of rows to provide as a hint to the JDBC driver when fetching table rows in a snapshot. A value of 0 disables this hint.

  • Type: int
  • Default: 2000
  • Importance: medium
redo.log.row.fetch.size

The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. A value of 0 disables this hint. When continuous mine is available (database versions prior to Oracle 19c), the mining query from the connector waits until the number of rows available from the redo log is at least the value specified for fetch size before returning the results.

  • Type: int
  • Default: 10 for database versions prior to Oracle 19c and 1000 for other database versions
  • Importance: medium
poll.linger.ms

The maximum time to wait for a record before returning an empty batch. The call to poll can return early before poll.linger.ms expires if max.batch.size records are received.

  • Type: long
  • Default: 5000
  • Importance: medium
snapshot.threads.per.task

The number of threads that can be used in each task to perform snapshots. This is only useful for a task if the value of the number of tables assigned to that task is more than this.

  • Type: int
  • Default: 4
  • Importance: medium
heartbeat.interval.ms

The interval after which the connector would emit heartbeats to an internal heartbeat topic (configured using heartbeat.topic.name) at regular intervals. The default value of heartbeat.interval.ms is 0 which disables the heartbeat mechanism. Confluent recommends you set the heartbeat.interval.ms parameter to a value in the order of minutes to hours in environments where the connector is configured to capture tables that are infrequently updated so the source offsets can move forward. Otherwise, a task restart could cause the connector to fail with an ORA-01291 missing logfile error if the archived redo log file corresponding to the stored source offset has been purged from the database. For more info about heartbeats, see Infrequently Updated Databases.

  • Type: int
  • Default: 0
  • Importance: medium
heartbeat.topic.name

The name of the Kafka topic to which the connector will emit heartbeat events at regular intervals configured using heartbeat.interval.ms. Note that if heartbeat.interval.ms is set to 0, the connector will not send any heartbeat messages and the topic won’t be created.

  • Type: string
  • Default: ${connectorName}-${databaseName}-heartbeat-topic
  • Valid Values: Template variables which resolve to a valid topic name. Valid topic names consist of 1 to 249 alphanumeric, +, ., _, \, and - characters.
  • Importance: medium
use.transaction.begin.for.mining.session

Set the start SCN for a log mining session to the start SCN of the oldest relevant open transaction, if one exists. A relevant transaction is defined as a transaction that contains changes to tables that the connector is set up to capture. Confluent recommends you to set this to true when connecting to Oracle Real Application Clusters (RAC) databases, or if large object datatypes (LOB) support is enabled (using enable.large.lob.object.support). This configuration is applicable only for Oracle database versions 19c and later.

  • Type: boolean
  • Default: false
  • Importance: medium
log.mining.transaction.age.threshold.ms

Specifies the threshold (in milliseconds) for the transaction age. Transaction age is defined as the amount time the transaction has been open on the database. If the transaction age exceeds the transaction age threshold, an action is taken depending on the value set in log.mining.transaction.threshold.breached.action. The default value is -1 which means that a transaction is retained in the buffer until the connector receives the commit or rollback event for the transaction. This configuration is applicable only when use.transaction.begin.for.mining.session is set to true.

  • Type: long
  • Default: -1
  • Importance: medium
log.mining.transaction.threshold.breached.action

Specifies the action to take when an active transaction exceeds the threshold defined using the log.mining.transaction.age.threshold.ms configuration.

  • Type: string
  • Default: warn
  • Valid values: discard and warn
    • discard: Drop long running transactions that exceed the threshold age from the buffer and skip emitting any records associated with these transactions.
    • warn: Log a warning, mentioning the oldest transaction that exceed the threshold.
  • Importance: medium

Low importance

redo.log.corruption.topic

The name of the Kafka topic where the connector records events that describe errors and corrupted portions of the Oracle redo log (missed data). This can optionally use the template variables ${connectorName}, ${databaseName}, and ${schemaName}. A blank topic name (the default) designates that this information is not written to Kafka.

  • Type: string
  • Default: blank
  • Importance: low
redo.log.consumer.fetch.min.bytes

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request waits for the minimum bytes of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available (the fetch request times out waiting for data to arrive). Setting this to something greater than 1 causes the server to wait for a larger amount of data to accumulate, which can improve server throughput at the cost of additional latency.

  • Type: int
  • Default: 1
  • Importance: low
redo.log.consumer.max.partition.fetch.bytes

The maximum amount of per-partition data the server will return. Records are fetched in batches by the consumer. If the first record batch (in the first fetched non-empty partition) is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. (This is not an absolute maximum.) The maximum record batch size accepted by the broker is defined using message.max.bytes in the Kafka broker configuration or max.message.bytes in the topic configuration. See fetch.max.bytes for limiting the consumer request size.

  • Type: int
  • Default: 1048576
  • Importance: low
redo.log.consumer.fetch.max.bytes

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch (in the first non-empty partition of the fetch) is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. (This is not an absolute maximum.) The maximum record batch size accepted by the broker is defined using message.max.bytes in the Kafka broker configuration or max.message.bytes in the topic configuration. Note that the consumer performs multiple fetches in parallel.

  • Type: long
  • Default: 52428800
  • Importance: low
redo.log.consumer.max.poll.records

The maximum number of records returned in a single call to poll().

  • Type: int
  • Default: 500
  • Importance: low
redo.log.consumer.request.timeout.ms

Controls the maximum amount of time the client waits for the request response. If the response is not received before the timeout elapses, the client resends the request (if necessary) or fails the request if retries are exhausted.

  • Type: int
  • Default: 30000
  • Importance: low
redo.log.consumer.receive.buffer.bytes

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the Operating System default buffer size is used.

  • Type: int
  • Default: 65536
  • Importance: low
redo.log.consumer.send.buffer.bytes

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used

  • Type: int
  • Default: 131072
  • Importance: low
behavior.on.dictionary.mismatch

Specifies the desired behavior when the connector is not able to parse the value of a column due to a dictionary mismatch caused by DDL statement. This can happen if the online dictionary mode is specified but the connector is streaming historical data recorded before DDL changes occurred. The default option fail will cause the connector task to fail. The log option will log the unparsable statement and skip the problematic record without failing the connector task.

  • Type: string
  • Default: fail
  • Importance: low
behavior.on.unparsable.statement

Specifies the desired behavior when the connector encounters a SQL statement that could not be parsed. The default option fail will cause the connector task to fail. The log option will log the unparsable statement and skip the problematic record without failing the connector task.

  • Type: string
  • Default: fail
  • Importance: low
oracle.dictionary.mode

The dictionary handling mode used by the connector. See Address DDL Changes in Oracle Database for Confluent Platform for more information.

  • Type: string
  • Default: auto
  • Valid Values: auto, online, or redo_log
    • auto: The connector uses the dictionary from the online catalog until a DDL statement to evolve the table schema is encountered. At which point, the connector starts using the dictionary from archived redo logs. Once the DDL statement has been processed, the connector reverts back to using the online catalog. Use this mode if DDL statements are expected.
    • online: The connector always uses the online dictionary catalog. Use online mode if no DDL statements are expected as DDL statements aren’t supported in online mode.
    • redo_log: The connector always uses the dictionary catalog from archived redo logs. Use this mode if you can not access the online redo log.
  • Importance: low
log.mining.archive.destination.name

The name of the archive log destination to use when mining archived redo logs. You can configure the connector to use a specific destination using this property (for example, LOG_ARCHIVE_DEST_2). This is only applicable for Oracle database versions 19c and later.

  • Type: string
  • Default: “”
  • Importance: low
record.buffer.mode

Where to buffer records that are part of the transaction but may not yet be committed.

Important

Database record buffering mode is not supported in Oracle Database version 19c and later. Use connector record buffering mode instead.

  • Type: string
  • Default: connector
  • Valid Values: connector, or database
    • connector: buffer uncommitted transactions in connector memory.
    • database: buffer the records in database. This option uses the COMMITTED_DATA_ONLY flag to LogMiner so that the connector only receives committed records. Transactions that are rolled back or in-progress are filtered out, as are internal redo records. Use this option if the worker where redo log processing task (task 0) is running is memory constrained so you would rather do buffering in the database. Note, though, that this option increases the database memory usage to stage all redo records within a single transaction in memory until LogMiner finds the commit record for that transaction. Therefore, it is possible to exhaust memory.
  • Importance: low
max.batch.timeout.ms

The maximum time to wait for a record before returning an empty batch. Must be at least 1000 milliseconds (one second). The default is 60000 milliseconds (one minute).

Important

  • Not in active support. Use poll.linger.ms instead.
  • Type: long
  • Default: 60000
  • Importance: low
max.buffer.size

The maximum number of records from all snapshot threads and from the redo log that can be buffered into batches. The default 0 means a buffer size will be computed from the maximum batch size (max.batch.size) and number of threads (snapshot.threads.per.task).

  • Type: int
  • Default: 0
  • Importance: low
lob.topic.name.template

The template that defines the name of the Kafka topic where the connector writes LOB objects. The value can be a constant if the connector writes all LOB objects from all captured tables to one topic. Or, the value can include any supported template variables (for example, ${columnName}, ${databaseName}, ${schemaName}, ${tableName}, and ${connectorName}). The default is empty, which ignores all LOB type columns if any exist on captured tables. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable. Any character that is not a valid character for topic name is replaced by an underscore in the topic name.

  • Type: string
  • Default: “”
  • Valid Values: Template variables which resolve to a valid topic name or a blank string. Valid topic names consist of 1 to 249 alphanumeric, +, ., _, \, and - characters.
  • Importance: low
enable.large.lob.object.support

If true, the connector will support large LOB objects that are split across multiple redo log records. The connector will emit commit messages to the redo log topic and use these commit messages to track when a large LOB object can be emitted to the LOB topic.

  • Type: boolean
  • Default: false
  • Importance: low
log.sensitive.data

If true, logs sensitive data (such as customer records, SQL queries or exception traces containing sensitive data). Set this to true only in exceptional scenarios where logging sensitive data is acceptable and is necessary for troubleshooting.

  • Type: boolean
  • Default: false
  • Importance: low
numeric.mapping

Map NUMERIC values by precision and optionally scale to primitive or decimal types.

  • Use none if all NUMERIC columns are to be represented by Connect’s DECIMAL logical type.
  • Use best_fit_or_decimal if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s DECIMAL logical type will be used instead, and the values will be represented in binary form within the change events.
  • Use best_fit_or_double if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s FLOAT64 type will be used instead.
  • Use best_fit_or_string if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s STRING type will be used instead.
  • Use precision_only to map NUMERIC columns based only on the column’s precision assuming that column’s scale is 0.

The none option is the default, but may lead to serialization issues since Connect’s DECIMAL type is mapped to its binary representation. One of the best_fit_or options will often be preferred. For backwards compatibility reasons, the best_fit option is also available. It behaves the same as best_fit_or_decimal. Updating this would require deletion of the table topic and the registered schemas if using non json value.converter.

  • Type: string
  • Default: none
  • Importance: low
numeric.default.scale

The default scale to use for numeric types when the scale cannot be determined.

  • Type: int
  • Default: 127
  • Importance: low
oracle.date.mapping

Map Oracle DATE values to Connect types.

  • Use date if all the DATE columns are to be represented by Connect’s Date logical type.
  • Use timestamp if the DATE columns should be cast to Connect’s Timestamp.

The date option is the default value for backward compatibility. Despite the name similarity, Oracle DATE type has different semantics than Connect Date. timestamp will often be preferred for semantic similarity.

  • Type: string
  • Default: date
  • Importance: low
emit.tombstone.on.delete

If true, delete operations emit a tombstone record with null value.

  • Type: boolean
  • Default: false
  • Importance: low
oracle.fan.events.enable

Whether the connection should allow using Oracle RAC Fast Application Notification (FAN) events. This is disabled by default, meaning FAN events will not be used even if they are supported by the database. You should only be enable this feature when using Oracle RAC set up with FAN events. Enabling the feature may cause connection issues when the database is not set up to use FAN events.

  • Type: boolean
  • Default: false
  • Importance: low
table.task.reconfig.checking.interval.ms

The interval for the background monitoring thread to examine changes to tables and reconfigure table placement if necessary. The default is 300000 milliseconds (5 minutes).

  • Type: long
  • Default: 300000
  • Importance: low
table.rps.logging.interval.ms

The interval for the background thread to log current requests per second (RPS) for each table.

  • Type: long
  • Default: 60000
  • Importance: low
log.mining.end.scn.deviation.ms

Calculates the end SCN of log mining sessions as the approximate SCN that corresponds to the point in time that is log.mining.end.scn.deviation.ms milliseconds before the current SCN obtained from the database. The default value is set to 3 seconds on RAC environments, and is not set otherwise. This configuration is applicable only for Oracle database versions 19c and later. Setting this configuration to a lower value on a RAC environment introduces the potential for data loss at high load. A higher value increases the end-to end-latency for change events.

  • Type: long
  • Default: 0 for single node and 3000 for RAC environments
  • Importance: low
output.before.state.field

The name of the field in the change record written to Kafka that contains the before state of changed database rows for an update operation. A blank value signals that this field should not be included in the change records. For more details, see Before state for update operation.

  • Type: string
  • Default: “”
  • Importance: low
output.table.name.field

The name of the field in the change record written to Kafka that contains the fully-qualified name of the affected Oracle table. A blank value signals that this field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the fully-qualified name of the affected Oracle table as a header with the given name.

  • Type: string
  • Default: table
  • Importance: low
output.scn.field

The name of the field in the change record written to Kafka that contains the Oracle System Change Number (SCN) where this change was made. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the SCN as a header with the given name.

  • Type: string
  • Default: scn
  • Importance: low
output.commit.scn.field

The name of the field in the change record written to Kafka that contains the Oracle System Change Number (SCN) when this transaction was committed. An empty value indicates that the field should not be included in the change records.

  • Type: string
  • Default: “”
  • Importance: low
output.op.type.field

The name of the field in the change record written to Kafka that contains the operation type for this change event. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the operation type as a header with the given name.

  • Type: string
  • Default: op_type
  • Importance: low
output.op.ts.field

The name of the field in the change record written to Kafka that contains the operation timestamp for the change event. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the operation timestamp as a header with the given name.

  • Type: string
  • Default: op_ts
  • Importance: low
output.current.ts.field

The name of the field in the change record written to Kafka that contains the current timestamp of the Kafka Connect worker when this change event was processed. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the current timestamp as a header with the given name.

  • Type: string
  • Default: current_ts
  • Importance: low
output.row.id.field

The name of the field in the change record written to Kafka that contains the row ID of the changed row. A blank value indicates the field field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the row ID as a header with the given name.

  • Type: string
  • Default: row_id
  • Importance: low
output.username.field

The name of the field in the change record written to Kafka that contains the name of the Oracle user that executed the transaction. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the username as a header with the given name.

  • Type: string
  • Default: username
  • Importance: low
output.redo.field

The name of the field in the change record written to Kafka that contains the original redo data manipulation language (DML) statement from which this change record was created. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the username as a header with the given name.

  • Type: string
  • Default: “”
  • Importance: low
output.undo.field

The name of the field in the change record written to Kafka that contains the original undo DML statement that effectively undoes this change and represents the “before” state of the row. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the username as a header with the given name.

  • Type: string
  • Default: “”
  • Importance: low
output.op.type.read.value

The value of the operation type for a read (snapshot) change event. By default this is R (read).

  • Type: string
  • Default: R
  • Importance: low
output.op.type.insert.value

The value of the operation type for an insert change event. By default this is I (insert).

  • Type: string
  • Default: I
  • Importance: low
output.op.type.update.value

The value of the operation type for an update change event. By default this is U (update).

  • Type: string
  • Default: U
  • Importance: low
output.op.type.delete.value

The value of the operation type for a delete change event. By default this is D (delete).

  • Type: string
  • Default: D
  • Importance: low
output.op.type.truncate.value

The value of the operation type for a truncate change event. By default this is T (truncate).

  • Type: string
  • Default: T
  • Importance: low
redo.log.startup.polling.limit.ms

The amount of time to wait for the redo log to be present on connector startup. This is only relevant when connector is configured to capture change events. On expiration of this wait time, the connector would move to a failed state.

  • Type: long
  • Default: 300000
  • Importance: low
snapshot.by.table.partitions

Whether the connector should perform snapshots on each table partition if the table is defined to use partitions. This is false by default, meaning that one snapshot is performed on each table in its entirety.

  • Type: boolean
  • Default: false
  • Importance: low
oracle.validation.result.fetch.size

The fetch size to be used while querying database for validations. This will be used to query list of tables and supplemental logging level validation.

  • Type: int
  • Default: 5000
  • Importance: low
redo.log.row.poll.fields.include

A comma-separated list of fields from the V$LOGMNR_CONTENTS view to include in the redo log events.

  • Type: list
  • Importance: low
redo.log.row.poll.fields.exclude

A comma-separated list of fields from the V$LOGMNR_CONTENTS view to exclude in the redo log events.

  • Type: list
  • Importance: low
redo.log.row.poll.username.include

A comma-separated list of database usernames. When this property is set, the connector captures changes only from the specified set of database users. You cannot set this property along with the redo.log.row.poll.username.exclude property.

  • Type: list
  • Importance: low
redo.log.row.poll.username.exclude

A comma-separated list of database usernames. When this property is set, the connector captures changes only from the specified set of database users. You cannot set this property along with the redo.log.row.poll.username.exclude property.

  • Type: list
  • Importance: low
db.timezone

Default timezone to assume when parsing Oracle DATE and TIMESTAMP types for which timezone information is not available. For example, if db.timezone=UTC, the data in both DATE and TIMESTAMP will be parsed as if in UTC timezone. The value has to be a valid java.util.TimeZone ID.

  • Type: string
  • Default: UTC
  • Importance: low
db.timezone.date

The default timezone to assume when parsing Oracle DATE type for which timezone information is not available. If db.timezone.date is set, the value of db.timezone for DATE type will be overwritten with the value in db.timezone.date. For example, if db.timezone=UTC and db.timezone.date=America/Los_Angeles, the data TIMESTAMP will be parsed as if it is in UTC timezone, and the data in DATE will be parsed as if in America/Los_Angeles timezone. The value has to be a valid java.util.TimeZone ID.

  • Type: string
  • Importance: low
oracle.supplemental.log.level

Database supplemental logging level for connector operation. If set to full, the connector validates the supplemental logging level on the database is FULL and then captures snapshots and CDC events for the specified tables whenever table.topic.name.template is not set to "". When the level is set to msl, the connector doesn’t capture the CDC change events, rather it only captures snapshots if table.topic.name.template is not set to "". Note that this setting is ignored if the table.topic.name.template is set to "" as the connector will only capture redo logs. This setting defaults to full supplemental logging level mode.

  • Type: string
  • Default: full
  • Valid Values: [msl, full]
  • Importance: low
ldap.url

The connection URL of LDAP server if using OID based LDAP.

  • Type: string
  • Importance: low
ldap.security.principal

The login principal or user if using SIMPLE Authentication for LDAP.

  • Type: string
  • Importance: low
ldap.security.credentials

The login password for principal if using SIMPLE Authentication for LDAP.

  • Type: string
  • Importance: low
oracle.ssl.truststore.file

If using SSL for encryption and server authentication, set this to the location of the trust store containing server certificates that should be trusted.

  • Type: string
  • Default: “”
  • Importance: low
oracle.ssl.truststore.password

If using SSL for encryption and server authentication, the password of the trust store containing server certificates that should be trusted.

  • Type: string
  • Default: “”
  • Importance: low
oracle.kerberos.cache.file

If using Kerberos 5 authentication, set this to the location of the Kerberos 5 ticket cache file on all the Connect workers.

  • Type: string
  • Default: “”
  • Importance: low
log.sensitive.data

If set to true, the connector logs sensitive data–such as customer records, SQL queries, and exception traces containing sensitive data. Confluent recommends you set this parameter to true only in cases where logging sensitive data is acceptable and necessary for troubleshooting.

  • Type: boolean
  • Default: false
  • Importance: low
retry.error.codes

A comma-separated list of Oracle error codes (for example, 12505, 12528,...) that the connector retries up to the time defined by the max.retry.time.ms parameter. By default, the connector retries in case of a recoverable or transient SQL exception and on certain Oracle error codes.

  • Type: list
  • Importance: low
enable.metrics.collection

If set to true, the connector records metrics that can be used to gain insight into the connector and troubleshoot issues. These metrics can be accessed using Java Management Extensions (JMX).

  • Type: boolean
  • Default: false
  • Importance: low

Confluent Platform license

confluent.topic.bootstrap.servers

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster used for licensing. All servers in the cluster will be discovered from the initial connection. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

  • Type: list
  • Importance: high
confluent.topic

Name of the Kafka topic used for Confluent Platform configuration, including licensing information.

  • Type: string
  • Default: _confluent-command
  • Importance: low
confluent.topic.replication.factor

The replication factor for the Kafka topic used for Confluent Platform configuration, including licensing information. This is used only if the topic does not already exist, and the default of 3 is appropriate for production use. If you are using a development environment with less than 3 brokers, you must set this to the number of brokers (often 1).

  • Type: int
  • Default: 3
  • Importance: low

Connection pool properties

The following properties are used to configure the connection pool used to share connections from the connector to the Oracle database.

connection.pool.initial.size

The number of JDBC connections created immediately upon startup, before any connection is needed. A non-zero initial size can be used to reduce the ramp-up time incurred by priming the pool to its optimal size.

  • Type: int
  • Default: 0
  • Importance: medium
connection.pool.min.size

The minimum number of available and borrowed JDBC connections in each worker’s connection pool. A connection pool always tries to return to the minimum pool size specified unless the minimum number has yet to be reached.

  • Type: int
  • Default: 0
  • Importance: medium
connection.pool.max.size

The maximum number of available and borrowed JDBC connections in each worker’s connection pool. If the maximum number of connections are borrowed (in use), no connections will be available for other uses until a borrowed connection is returned to the pool.

  • Type: int
  • Default: 1
  • Importance: medium
connection.pool.login.timeout.ms

The maximum time in milliseconds to wait for a connection to be established before failing. A value of zero specifies that the system timeout should be used.

  • Type: long
  • Default: 0
  • Importance: low
connection.pool.connection.wait.timeout.ms

Specifies how long an application request waits to obtain a connection if there are no longer any connections in the pool. A connection pool runs out of connections if all connections in the pool are being used (borrowed) and if the pool size has reached its maximum connection capacity, as specified by the maximum pool size property. This timeout feature improves overall application usability by minimizing the amount of time an application is blocked and provides the ability to implement a graceful recovery. A value of zero disables the feature.

  • Type: long
  • Default: 0
  • Importance: low
connection.pool.validation.on.borrow.enabled

Specifies whether connections are validated when they are borrowed from the connection pool.

  • Type: boolean
  • Default: true
  • Importance: low
connection.pool.validation.sql

Specifies the SQL statement to use when validating connections.

  • Type: string
  • Default: null
  • Importance: low
connection.pool.max.ttl.ms

The maximum time a borrowed connection can remain borrowed before the connection is reclaimed by the pool. A value of 0 disables the feature.

  • Type: long
  • Default: 0L
  • Importance: low
connection.pool.abandoned.timeout.ms

The maximum time in milliseconds that a borrowed connection can remain unused before the pool reclaims the connection. A value of 0 disables the feature.

  • Type: long
  • Default: 0L
  • Importance: low
connection.pool.inactive.timeout.ms

Specifies how long an available connection can remain idle before it is closed and removed from the pool. This timeout property is only applicable to available connections and does not affect borrowed connections. This property helps conserve resources that are otherwise lost on maintaining connections that are no longer being used. The inactive connection timeout (together with the maximum pool size) allows a connection pool to grow and shrink as application load changes. A value of 0 disables the feature.

  • Type: long
  • Default: 0L
  • Importance: low
connection.pool.check.interval.timeout.ms

Specifies how frequently the timeout properties (abandoned connection timeout, time-to-live connection timeout, and inactive connection timeout) are enforced. The default is 30 seconds (30000 milliseconds).

  • Type: long
  • Default: 0
  • Importance: low
connection.pool.connection.wait.timeout.ms

Specifies how long an application request waits to obtain a connection if there are no longer any connections in the pool. A connection pool runs out of connections if all connections in the pool are being used (borrowed) and if the pool size has reached it’s maximum connection capacity as specified by the maximum pool size property. This timeout feature improves overall application usability by minimizing the amount of time an application is blocked and provides the ability to implement a graceful recovery. A value of 0 disables the feature.

  • Type: long
  • Default: 0
  • Importance: low
connection.pool.validation.timeout.ms

The maximum duration in milliseconds for a connection validation on a borrowed connection.

  • Type: long
  • Default: 0
  • Importance: low
connection.pool.trust.idle.time.ms

The time in seconds to trust a recently-used and -tested database connection and skip the validation test during connection checkout.

  • Type: long
  • Default: 0
  • Importance: low
connection.pool.max.reuse.time.ms

Allows connections to be gracefully closed and removed from the pool after a connection has been in use for a specific amount of time. This property is typically used to periodically recycle connections in order to eliminate issues such as memory leaks.

  • Type: long
  • Default: 0L
  • Importance: low
connection.pool.max.reuse.count

Allows connections to be gracefully closed and removed from the connection pool after a connection has been borrowed a specific number of times. This property is typically used to periodically recycle connections in order to eliminate issues such as memory leaks.

  • Type: int
  • Default: 0
  • Importance: low
connection.pool.property.cycle.interval.ms

The time interval in milliseconds between checks to enforce connection pool timeout properties.

  • Type: long
  • Default: 0L
  • Importance: low
connection.pool.property.max.statements

The maximum number of statements to cache for each connection. Statement caching is not used when set to 0 (the default).

  • Type: int
  • Default: 0
  • Importance: low
connection.pool.harvest.trigger.count

Specifies the available connection threshold that triggers connection harvesting. For example, if the connection harvest trigger count is set to 10, then connection harvesting is triggered when the number of available connections in the pool drops to 10. A value of 2147483647 (the default) disables harvesting.

  • Type: int
  • Default: 0
  • Importance: low
connection.pool.harvest.max.count

How many borrowed connections should be returned to the pool once the harvest trigger count has been reached.

  • Type: int
  • Default: 0
  • Importance: low
connection.pool.fast.failover.enabled

Enables Fast Connection Failover (FCF) for the connection pool accessed using this pool-enabled data source. By default, FCF is disabled.

  • Type: boolean
  • Default: 0
  • Importance: low
redo.log.initial.delay.interval.ms

The amount of time to wait for in-flight transactions to complete after the initial snapshot is taken, before the initial startup of reading redo logs. Only relevant on cold start when connector is configured to capture change events.

  • Type: long
  • Default: 0
  • Importance: low

Confluent license properties

You can put license-related properties in the connector configuration, or starting with Confluent Platform version 6.0, you can put license-related properties in the Connect worker configuration instead of in each connector configuration.

This connector is proprietary and requires a license. The license information is stored in the _confluent-command topic. If the broker requires SSL for connections, you must include the security-related confluent.topic.* properties as described below.

confluent.license

Confluent issues enterprise license keys to each subscriber. The license key is text that you can copy and paste as the value for confluent.license. A trial license allows using the connector for a 30-day trial period. A developer license allows using the connector indefinitely for single-broker development environments.

If you are a subscriber, contact Confluent Support for more information.

  • Type: string
  • Default: “”
  • Valid Values: Confluent Platform license
  • Importance: high
confluent.topic.ssl.truststore.location

The location of the trust store file.

  • Type: string
  • Default: null
  • Importance: high
confluent.topic.ssl.truststore.password

The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way authentication for client.

  • Type: string
  • Default: null
  • Importance: high
confluent.topic.ssl.keystore.password

The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.ssl.key.password

The password of the private key in the key store file. This is optional for client.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.security.protocol

Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

  • Type: string
  • Default: “PLAINTEXT”
  • Importance: medium

License topic configuration

A Confluent enterprise license is stored in the _confluent-command topic. This topic is created by default and contains the license that corresponds to the license key supplied through the confluent.license property. No public keys are stored in Kafka topics.

The following describes how the default _confluent-command topic is generated under different scenarios:

  • A 30-day trial license is automatically generated for the _confluent command topic if you do not add the confluent.license property or leave this property empty (for example, confluent.license=).
  • Adding a valid license key (for example, confluent.license=<valid-license-key>) adds a valid license in the _confluent-command topic.

Here is an example of the minimal properties for development and testing.

You can change the name of the _confluent-command topic using the confluent.topic property (for instance, if your environment has strict naming conventions). The example below shows this change and the configured Kafka bootstrap server.

confluent.topic=foo_confluent-command
confluent.topic.bootstrap.servers=localhost:9092

The example above shows the minimally required bootstrap server property that you can use for development and testing. For a production environment, you add the normal producer, consumer, and topic configuration properties to the connector properties, prefixed with confluent.topic..

License topic ACLs

The _confluent-command topic contains the license that corresponds to the license key supplied through the confluent.license property. It is created by default. Connectors that access this topic require the following ACLs configured:

  • CREATE and DESCRIBE on the resource cluster, if the connector needs to create the topic.

  • DESCRIBE, READ, and WRITE on the _confluent-command topic.

    Important

    You can also use DESCRIBE and READ without WRITE to restrict access to read-only for license topic ACLs. If a topic exists, the LicenseManager will not try to create the topic.

You can provide access either individually for each principal that will use the license or use a wildcard entry to allow all clients. The following examples show commands that you can use to configure ACLs for the resource cluster and _confluent-command topic.

  1. Set a CREATE and DESCRIBE ACL on the resource cluster:

    kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
    --add --allow-principal User:<principal> \
    --operation CREATE --operation DESCRIBE --cluster
    
  2. Set a DESCRIBE, READ, and WRITE ACL on the _confluent-command topic:

    kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
    --add --allow-principal User:<principal> \
    --operation DESCRIBE --operation READ --operation WRITE --topic _confluent-command
    

Override Default Configuration Properties

You can override the replication factor using confluent.topic.replication.factor. For example, when using a Kafka cluster as a destination with less than three brokers (for development and testing) you should set the confluent.topic.replication.factor property to 1.

You can override producer-specific properties by using the producer.override.* prefix (for source connectors) and consumer-specific properties by using the consumer.override.* prefix (for sink connectors).

You can use the defaults or customize the other properties as well. For example, the confluent.topic.client.id property defaults to the name of the connector with -licensing suffix. You can specify the configuration settings for brokers that require SSL or SASL for client connections using this prefix.

You cannot override the cleanup policy of a topic because the topic always has a single partition and is compacted. Also, do not specify serializers and deserializers using this prefix; they are ignored if added.