Configuration Reference for Oracle CDC Source Connector for Confluent Platform¶
The sections in this page define each of the Oracle CDC Source connector configuration properties and groups them by their level of importance.
Note
These are properties for the self-managed connector. If you are using Confluent Cloud, see Oracle CDC Source Connector for Confluent Cloud.
High importance¶
oracle.server
The host name or address for the Oracle server.
- Type: string
- Valid Values: Valid IPv4/IPv6 address or host name
- Importance: high
oracle.port
The port number used to connect to Oracle.
- Type: int
- Default: 1521
- Valid Values:
1
to65535
(inclusive) - Importance: high
oracle.username
The name of the Oracle database user.
- Type: string
- Importance: high
oracle.password
The password for the Oracle database user.
- Type: string
- Importance: high
oracle.sid
The Oracle system identifier (SID) of a multitenant container database (CDB) or non-multitenant database where tables reside. Confluent recommends you use
oracle.service.name
to connect to the database using service names instead of using the SID. Maps to the SID parameter in the connect descriptor.- Type: string
- Valid Values: Non-empty string
- Importance: high
oracle.pdb.name
The name of the pluggable database (PDB). This is not required in the case where tables reside in the CDB$ROOT database, or if you’re using a non-container database.
- Type: string
- Valid Values: Non-empty string
- Importance: high
oracle.service.name
The Oracle service name. If set, the connector always connects to the database using the provided service name. The
oracle.service.name
maps to the SERVICE_NAME parameter in the connect descriptor. For the multitenant container database (CDB) or non-multitenant database, this does not need to be specified.Important
- Confluent recommends you set the
oracle.service.name
to the container database (CDB) service name when using a pluggable database (PDB). - When this property is set, it is used in the connect descriptor instead of
oracle.sid
.
- Type: string
- Default: “”
- Importance: high
- Confluent recommends you set the
key.converter
Converter class used to serialize and format all record keys written to Kafka and deserialize all record keys read from Kafka. Examples of common formats include JSON, Avro, and Protobuf. The converter may require more
key.converter.*
properties. For more details, see Configuring Key and Value Converters.- Type: class
- Valid Values: A valid converter class
- Importance: high
value.converter
Converter class used to serialize and format all record values written to Kafka and deserialize all record values read from Kafka. Examples of common formats include JSON, Avro, and Protobuf. A valid converter must be specified in the connector configuration to capture change events to the table change event topics. The converter may require more
value.converter.*
properties. For more details, see Configuring Key and Value Converters.- Type: class
- Valid Values: A valid converter class
- Importance: high
table.inclusion.regex
The regular expression that matches the fully-qualified table names. The values are matched (case sensitive) with the object names stored in the data dictionary (The value is uppercase unless created as a quoted identifier. Database and PDB names are always stored as uppercase in the data dictionary). Ensure casing is consistent for the SID portion in the identifier with the
oracle.sid
value specified—that is, if the SID is lowercase on the database side, the SID section of the regex must also be in lowercase. The same principal applies for uppercase values. For example, if the SID isorclcdb
, then the regex should beorclcdb.[.]C##MYUSER[.]table1
, and if the SID isORCLCDB
, the regex should beORCLCDB.[.]C##MYUSER[.]table1
.- For non-container database, the fully-qualified name includes the SID and
schema name (for example,
ORCLDB[.]MYUSER[.](ORDERS|CUSTOMERS)
orORCLDB\\.MYUSER\\.(ORDERS|CUSTOMERS)
). - For multitenant database (CDB), the fully-qualified name includes the SID
and schema name (for example,
ORCLCDB[.]C##MYUSER[.](ORDERS|CUSTOMERS)
orORCLCDB\\.C##MYUSER\\.(ORDERS|CUSTOMERS)
). - For multitenant database (PDB), the fully-qualified name includes the PDB
and schema name (for example,``ORCLPDB1[.]C##MYUSER[.](ORDERS|CUSTOMERS)``
or
ORCLPDB1\\.C##MYUSER\\.(ORDERS|CUSTOMERS)
).
For more details on this refer Regex Pattern Issues
- Type: string
- Valid Values: A valid regular expression or a blank string when the
table.topic.name.template
is blank. - Importance: high
- For non-container database, the fully-qualified name includes the SID and
schema name (for example,
table.exclusion.regex
The regular expression that matches the fully-qualified table names. The values are matched (case sensitive) with the object names stored in the data dictionary (The value is uppercase unless created as a quoted identifier. Database and PDB names are always stored as uppercase in the data dictionary). Ensure casing is consistent for the SID portion in the identifier with the
oracle.sid
value specified—that is, if the SID is lowercase on the database side, the SID section of the regex must also be in lowercase. The same principal applies for uppercase values. For example, if the SID isorclcdb
, then the regex should beorclcdb.[.]C##MYUSER[.]table1
, and if the SID isORCLCDB
, the regex should beORCLCDB.[.]C##MYUSER[.]table1
- For non-container database, the fully-qualified name includes the SID and
schema name (for example,
ORCLDB[.]MYUSER[.](ORDERS|CUSTOMERS)
orORCLDB\\.MYUSER\\.(ORDERS|CUSTOMERS)
). - For multitenant database (CDB), the fully-qualified name includes the SID
and schema name (for example,
ORCLCDB[.]C##MYUSER[.](ORDERS|CUSTOMERS)
orORCLCDB\\.C##MYUSER\\.(ORDERS|CUSTOMERS)
). - For multitenant database (PDB), the fully-qualified name includes the PDB
and schema name (for example,
ORCLPDB1[.]C##MYUSER[.](ORDERS|CUSTOMERS)
orORCLPDB1\\.C##MYUSER\\.(ORDERS|CUSTOMERS)
).
For more details on this refer Regex Pattern Issues
- Type: string
- Default: blank
- Valid Values: A blank string or a valid regular expression.
- Importance: high
- For non-container database, the fully-qualified name includes the SID and
schema name (for example,
table.topic.name.template
The template that defines the name of the Kafka topic where the change event is written. The value can be a constant if the connector writes all change events from all captured tables to one topic. This can be left blank only if the connector has to write events only to the redo log topic and not to the table change event topics. Special characters, including
\
,$
,{
, and}
must be escaped with\
when not intended to be part of a template variable.- Type: string
- Default:
${fullyQualifiedTableName}
- Valid Values: Template variables which resolve to a valid topic name or a blank string. Valid topic names consist of
1
to249
alphanumeric,+
,.
,_
,\
, and-
characters. - Importance: high
redo.log.topic.name
The template for the name of the Kafka topic where the connector records all redo log events.
- Type: string
- Default:
${connectorName}-${databaseName}-redo-log
- Valid Values: Template variables which resolve to a valid topic name. Valid topic names consist of
1
to249
alphanumeric,+
,.
,_
,\
, and-
characters. - Importance: high
redo.log.consumer.bootstrap.servers
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster with the redo log topic. This list is only used to connect to the initial hosts to discover the full set of servers. This list should be in the form
host1:port1,host2:port2,...
. This list does not need to contain the full set of servers. You should enter more than one host/port pair in case a server is offline. A valid list of bootstrap servers must be specified for the redo log consumers to read events from the redo log topic and write change events to the table change event topics. This is not required if table.topic.name.template is set to the empty string.- Type: string
- Valid Values: Kafka cluster server host/port pairs
- Importance: high
Medium importance¶
start.from
When starting for the first time, this is the position in the redo log where the connector should start. Specifies an Oracle System Change Number (SCN) or a database timestamp with format
yyyy-MM-dd HH:mm:SS
in the database time zone.- Type: string
- Default:
snapshot
- Valid values:
snapshot
,current
andforce_current
snapshot
: This instructs the connector to perform an initial snapshot of each captured table prior to capturing changes.current
: This may be used to instruct the connector to start from the current Oracle SCN without snapshotting.force_current
: This is the same ascurrent
but it will ignore any previously stored offsets when the connector is restarted. This option should be used with caution as it can result in losing changes between the SCN stored in the offsets and the current SCN. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs. Every option other thanforce_current
causes the connector to resume from the stored offsets in case of task restarts or re-balances.
- Importance: medium
key.template
The template that defines the Kafka record key for each change event. By default the record key contains a concatenated primary key value delimited by an underscore (
_
) character. Use${primaryKeyStructOrValue}
to contain either the sole column value of a single-column primary key or a STRUCT with the multi-column primary key fields (or null if the table has no primary or unique key). Use${primaryKeyStruct}
to always use a STRUCT for primary keys that have one or more columns (or null if there is no primary or unique key). If the template contains variables or string literals, the record key is the string with the variables resolved and replaced.- Type: string
- Default:
${primaryKeyStructOrValue}
- Importance: medium
max.batch.size
The maximum number of record to be returned in a single batch from the connector to Kafka Connect. The connector may return fewer records if no additional records are available.
- Type: int
- Default:
1000
- Importance: medium
query.timeout.ms
The timeout in milliseconds (ms) for any query submitted to Oracle. The default is 5 minutes (or 300000 milliseconds). If set to negative values, the connector will not enforce timeout on queries.
- Type: long
- Default:
300000
- Importance: medium
max.retry.time.ms
The maximum total time in milliseconds (ms) the connector tries to reconnect after receiving a retriable exception. A non-positive value disables retries. The default is 86400000 ms (24 hours).
- Type: long
- Default:
86400000
- Importance: medium
redo.log.poll.interval.ms
The interval between polls to retrieve the database logs. This has no effect when using Oracle database versions prior to 19c.
- Type: int
- Default:
1000
- Importance: medium
snapshot.row.fetch.size
The number of rows to provide as a hint to the JDBC driver when fetching table rows in a snapshot. A value of 0 disables this hint.
- Type: int
- Default:
2000
- Importance: medium
redo.log.row.fetch.size
The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. A value of 0 disables this hint. When continuous mine is available (database versions prior to Oracle 19c), the mining query from the connector waits until the number of rows available from the redo log is at least the value specified for fetch size before returning the results.
- Type: int
- Default:
10
for database versions prior to Oracle 19c and1000
for other database versions - Importance: medium
poll.linger.ms
The maximum time to wait for a record before returning an empty batch. The call to poll can return early before
poll.linger.ms
expires ifmax.batch.size
records are received.- Type: long
- Default: 5000
- Importance: medium
snapshot.threads.per.task
The number of threads that can be used in each task to perform snapshots. This is only useful for a task if the value of the number of tables assigned to that task is more than this.
- Type: int
- Default: 4
- Importance: medium
heartbeat.interval.ms
The interval after which the connector would emit heartbeats to an internal heartbeat topic (configured using
heartbeat.topic.name
) at regular intervals. The default value ofheartbeat.interval.ms
is 0 which disables the heartbeat mechanism. Confluent recommends you set theheartbeat.interval.ms
parameter to a value in the order of minutes to hours in environments where the connector is configured to capture tables that are infrequently updated so the source offsets can move forward. Otherwise, a task restart could cause the connector to fail with anORA-01291 missing logfile
error if the archived redo log file corresponding to the stored source offset has been purged from the database. For more info about heartbeats, see Infrequently Updated Databases.- Type: int
- Default: 0
- Importance: medium
heartbeat.topic.name
The name of the Kafka topic to which the connector will emit heartbeat events at regular intervals configured using
heartbeat.interval.ms
. Note that ifheartbeat.interval.ms
is set to 0, the connector will not send any heartbeat messages and the topic won’t be created.- Type: string
- Default:
${connectorName}-${databaseName}-heartbeat-topic
- Valid Values: Template variables which resolve to a
valid topic name. Valid topic names consist of
1
to249
alphanumeric,+
,.
,_
,\
, and-
characters. - Importance: medium
use.transaction.begin.for.mining.session
Set the start SCN for a log mining session to the start SCN of the oldest relevant open transaction, if one exists. A relevant transaction is defined as a transaction that contains changes to tables that the connector is set up to capture. Confluent recommends you to set this to
true
when connecting to Oracle Real Application Clusters (RAC) databases, or if large object datatypes (LOB) support is enabled (usingenable.large.lob.object.support
). This configuration is applicable only for Oracle database versions 19c and later.- Type: boolean
- Default:
false
- Importance: medium
log.mining.transaction.age.threshold.ms
Specifies the threshold (in milliseconds) for the transaction age. Transaction age is defined as the amount time the transaction has been open on the database. If the transaction age exceeds the transaction age threshold, an action is taken depending on the value set in
log.mining.transaction.threshold.breached.action
. The default value is -1 which means that a transaction is retained in the buffer until the connector receives the commit or rollback event for the transaction. This configuration is applicable only whenuse.transaction.begin.for.mining.session
is set totrue
.- Type: long
- Default:
-1
- Importance: medium
log.mining.transaction.threshold.breached.action
Specifies the action to take when an active transaction exceeds the threshold defined using the
log.mining.transaction.age.threshold.ms
configuration.- Type: string
- Default:
warn
- Valid values:
discard
andwarn
discard
: Drop long running transactions that exceed the threshold age from the buffer and skip emitting any records associated with these transactions.warn
: Log a warning, mentioning the oldest transaction that exceed the threshold.
- Importance: medium
Low importance¶
redo.log.corruption.topic
The name of the Kafka topic where the connector records events that describe errors and corrupted portions of the Oracle redo log (missed data). This can optionally use the template variables
${connectorName}
,${databaseName}
, and${schemaName}
. A blank topic name (the default) designates that this information is not written to Kafka.- Type: string
- Default: blank
- Importance: low
redo.log.consumer.fetch.min.bytes
The minimum amount of data the server should return for a fetch request. If insufficient data is available the request waits for the minimum bytes of data to accumulate before answering the request. The default setting of
1
byte means that fetch requests are answered as soon as a single byte of data is available (the fetch request times out waiting for data to arrive). Setting this to something greater than1
causes the server to wait for a larger amount of data to accumulate, which can improve server throughput at the cost of additional latency.- Type: int
- Default:
1
- Importance: low
redo.log.consumer.max.partition.fetch.bytes
The maximum amount of per-partition data the server will return. Records are fetched in batches by the consumer. If the first record batch (in the first fetched non-empty partition) is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. (This is not an absolute maximum.) The maximum record batch size accepted by the broker is defined using
message.max.bytes
in the Kafka broker configuration ormax.message.bytes
in the topic configuration. Seefetch.max.bytes
for limiting the consumer request size.- Type: int
- Default:
1048576
- Importance: low
redo.log.consumer.fetch.max.bytes
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch (in the first non-empty partition of the fetch) is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. (This is not an absolute maximum.) The maximum record batch size accepted by the broker is defined using
message.max.bytes
in the Kafka broker configuration ormax.message.bytes
in the topic configuration. Note that the consumer performs multiple fetches in parallel.- Type: long
- Default:
52428800
- Importance: low
redo.log.consumer.max.poll.records
The maximum number of records returned in a single call to poll().
- Type: int
- Default:
500
- Importance: low
redo.log.consumer.request.timeout.ms
Controls the maximum amount of time the client waits for the request response. If the response is not received before the timeout elapses, the client resends the request (if necessary) or fails the request if retries are exhausted.
- Type: int
- Default:
30000
- Importance: low
redo.log.consumer.receive.buffer.bytes
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is
-1
, the Operating System default buffer size is used.- Type: int
- Default:
65536
- Importance: low
redo.log.consumer.send.buffer.bytes
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is
-1
, the OS default will be used- Type: int
- Default:
131072
- Importance: low
behavior.on.dictionary.mismatch
Specifies the desired behavior when the connector is not able to parse the value of a column due to a dictionary mismatch caused by DDL statement. This can happen if the
online
dictionary mode is specified but the connector is streaming historical data recorded before DDL changes occurred. The default optionfail
will cause the connector task to fail. Thelog
option will log the unparsable statement and skip the problematic record without failing the connector task.- Type: string
- Default:
fail
- Importance: low
behavior.on.unparsable.statement
Specifies the desired behavior when the connector encounters a SQL statement that could not be parsed. The default option
fail
will cause the connector task to fail. Thelog
option will log the unparsable statement and skip the problematic record without failing the connector task.- Type: string
- Default:
fail
- Importance: low
oracle.dictionary.mode
The dictionary handling mode used by the connector. See Address DDL Changes in Oracle Database for Confluent Platform for more information.
- Type: string
- Default:
auto
- Valid Values:
auto
,online
, orredo_log
auto
: The connector uses the dictionary from the online catalog until a DDL statement to evolve the table schema is encountered. At which point, the connector starts using the dictionary from archived redo logs. Once the DDL statement has been processed, the connector reverts back to using the online catalog. Use this mode if DDL statements are expected.online
: The connector always uses the online dictionary catalog. Useonline
mode if no DDL statements are expected as DDL statements aren’t supported inonline
mode.redo_log
: The connector always uses the dictionary catalog from archived redo logs. Use this mode if you can not access the online redo log.
- Importance: low
log.mining.archive.destination.name
The name of the archive log destination to use when mining archived redo logs. You can configure the connector to use a specific destination using this property (for example, LOG_ARCHIVE_DEST_2). This is only applicable for Oracle database versions 19c and later.
- Type: string
- Default: “”
- Importance: low
record.buffer.mode
Where to buffer records that are part of the transaction but may not yet be committed.
Important
Database record buffering mode is not supported in Oracle Database version 19c and later. Use connector record buffering mode instead.
- Type: string
- Default:
connector
- Valid Values:
connector
, ordatabase
connector
: buffer uncommitted transactions in connector memory.database
: buffer the records in database. This option uses theCOMMITTED_DATA_ONLY
flag to LogMiner so that the connector only receives committed records. Transactions that are rolled back or in-progress are filtered out, as are internal redo records. Use this option if the worker where redo log processing task (task 0) is running is memory constrained so you would rather do buffering in the database. Note, though, that this option increases the database memory usage to stage all redo records within a single transaction in memory until LogMiner finds the commit record for that transaction. Therefore, it is possible to exhaust memory.
- Importance: low
max.batch.timeout.ms
The maximum time to wait for a record before returning an empty batch. Must be at least 1000 milliseconds (one second). The default is 60000 milliseconds (one minute).
Important
- Not in active support. Use poll.linger.ms instead.
- Type: long
- Default:
60000
- Importance: low
max.buffer.size
The maximum number of records from all snapshot threads and from the redo log that can be buffered into batches. The default
0
means a buffer size will be computed from the maximum batch size (max.batch.size
) and number of threads (snapshot.threads.per.task
).- Type: int
- Default:
0
- Importance: low
lob.topic.name.template
The template that defines the name of the Kafka topic where the connector writes LOB objects. The value can be a constant if the connector writes all LOB objects from all captured tables to one topic. Or, the value can include any supported template variables (for example,
${columnName}
,${databaseName}
,${schemaName}
,${tableName}
, and${connectorName}
). The default is empty, which ignores all LOB type columns if any exist on captured tables. Special-meaning characters\
,$
,{
, and}
must be escaped with\
when not intended to be part of a template variable. Any character that is not a valid character for topic name is replaced by an underscore in the topic name.- Type: string
- Default: “”
- Valid Values: Template variables which resolve to a valid topic name or a blank string. Valid topic names consist of
1
to249
alphanumeric,+
,.
,_
,\
, and-
characters. - Importance: low
enable.large.lob.object.support
If
true
, the connector will support large LOB objects that are split across multiple redo log records. The connector will emit commit messages to the redo log topic and use these commit messages to track when a large LOB object can be emitted to the LOB topic.- Type: boolean
- Default: false
- Importance: low
log.sensitive.data
If
true
, logs sensitive data (such as customer records, SQL queries or exception traces containing sensitive data). Set this to true only in exceptional scenarios where logging sensitive data is acceptable and is necessary for troubleshooting.- Type: boolean
- Default: false
- Importance: low
numeric.mapping
Map NUMERIC values by precision and optionally scale to primitive or decimal types.
- Use
none
if all NUMERIC columns are to be represented by Connect’s DECIMAL logical type. - Use
best_fit_or_decimal
if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s DECIMAL logical type will be used instead, and the values will be represented in binary form within the change events. - Use
best_fit_or_double
if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s FLOAT64 type will be used instead. - Use
best_fit_or_string
if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s STRING type will be used instead. - Use
precision_only
to map NUMERIC columns based only on the column’s precision assuming that column’s scale is 0.
The
none
option is the default, but may lead to serialization issues since Connect’s DECIMAL type is mapped to its binary representation. One of thebest_fit_or
options will often be preferred. For backwards compatibility reasons, thebest_fit
option is also available. It behaves the same asbest_fit_or_decimal
. Updating this would require deletion of the table topic and the registered schemas if using non jsonvalue.converter
.- Type: string
- Default:
none
- Importance: low
- Use
numeric.default.scale
The default scale to use for numeric types when the scale cannot be determined.
- Type: int
- Default: 127
- Importance: low
oracle.date.mapping
Map Oracle DATE values to Connect types.
- Use
date
if all theDATE
columns are to be represented by Connect’s Date logical type. - Use
timestamp
if theDATE
columns should be cast to Connect’s Timestamp.
The
date
option is the default value for backward compatibility. Despite the name similarity, OracleDATE
type has different semantics than Connect Date.timestamp
will often be preferred for semantic similarity.- Type: string
- Default:
date
- Importance: low
- Use
emit.tombstone.on.delete
If true, delete operations emit a tombstone record with null value.
- Type: boolean
- Default: false
- Importance: low
oracle.fan.events.enable
Whether the connection should allow using Oracle RAC Fast Application Notification (FAN) events. This is disabled by default, meaning FAN events will not be used even if they are supported by the database. You should only be enable this feature when using Oracle RAC set up with FAN events. Enabling the feature may cause connection issues when the database is not set up to use FAN events.
- Type: boolean
- Default: false
- Importance: low
table.task.reconfig.checking.interval.ms
The interval for the background monitoring thread to examine changes to tables and reconfigure table placement if necessary. The default is 300000 milliseconds (5 minutes).
- Type: long
- Default:
300000
- Importance: low
table.rps.logging.interval.ms
The interval for the background thread to log current requests per second (RPS) for each table.
- Type: long
- Default:
60000
- Importance: low
log.mining.end.scn.deviation.ms
Calculates the end SCN of log mining sessions as the approximate SCN that corresponds to the point in time that is
log.mining.end.scn.deviation.ms
milliseconds before the current SCN obtained from the database. The default value is set to 3 seconds on RAC environments, and is not set otherwise. This configuration is applicable only for Oracle database versions 19c and later. Setting this configuration to a lower value on a RAC environment introduces the potential for data loss at high load. A higher value increases the end-to end-latency for change events.- Type: long
- Default:
0
for single node and3000
for RAC environments - Importance: low
output.before.state.field
The name of the field in the change record written to Kafka that contains the before state of changed database rows for an update operation. A blank value signals that this field should not be included in the change records. For more details, see Before state for update operation.
- Type: string
- Default: “”
- Importance: low
output.table.name.field
The name of the field in the change record written to Kafka that contains the fully-qualified name of the affected Oracle table. A blank value signals that this field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the fully-qualified name of the affected Oracle table as a header with the given name.- Type: string
- Default:
table
- Importance: low
output.scn.field
The name of the field in the change record written to Kafka that contains the Oracle System Change Number (SCN) where this change was made. A blank value indicates the field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the SCN as a header with the given name.- Type: string
- Default:
scn
- Importance: low
output.commit.scn.field
The name of the field in the change record written to Kafka that contains the Oracle System Change Number (SCN) when this transaction was committed. An empty value indicates that the field should not be included in the change records.
- Type: string
- Default: “”
- Importance: low
output.op.type.field
The name of the field in the change record written to Kafka that contains the operation type for this change event. A blank value indicates the field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the operation type as a header with the given name.- Type: string
- Default:
op_type
- Importance: low
output.op.ts.field
The name of the field in the change record written to Kafka that contains the operation timestamp for the change event. A blank value indicates the field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the operation timestamp as a header with the given name.- Type: string
- Default:
op_ts
- Importance: low
output.current.ts.field
The name of the field in the change record written to Kafka that contains the current timestamp of the Kafka Connect worker when this change event was processed. A blank value indicates the field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the current timestamp as a header with the given name.- Type: string
- Default:
current_ts
- Importance: low
output.row.id.field
The name of the field in the change record written to Kafka that contains the row ID of the changed row. A blank value indicates the field field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the row ID as a header with the given name.- Type: string
- Default:
row_id
- Importance: low
output.username.field
The name of the field in the change record written to Kafka that contains the name of the Oracle user that executed the transaction. A blank value indicates the field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the username as a header with the given name.- Type: string
- Default:
username
- Importance: low
output.redo.field
The name of the field in the change record written to Kafka that contains the original redo data manipulation language (DML) statement from which this change record was created. A blank value indicates the field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the username as a header with the given name.- Type: string
- Default: “”
- Importance: low
output.undo.field
The name of the field in the change record written to Kafka that contains the original undo DML statement that effectively undoes this change and represents the “before” state of the row. A blank value indicates the field should not be included in the change records. Use unescaped
.
characters to designate nested fields within structs, or prefix withheader:
to write the username as a header with the given name.- Type: string
- Default: “”
- Importance: low
output.op.type.read.value
The value of the operation type for a read (snapshot) change event. By default this is
R
(read).- Type: string
- Default:
R
- Importance: low
output.op.type.insert.value
The value of the operation type for an insert change event. By default this is
I
(insert).- Type: string
- Default:
I
- Importance: low
output.op.type.update.value
The value of the operation type for an update change event. By default this is
U
(update).- Type: string
- Default:
U
- Importance: low
output.op.type.delete.value
The value of the operation type for a delete change event. By default this is
D
(delete).- Type: string
- Default:
D
- Importance: low
output.op.type.truncate.value
The value of the operation type for a truncate change event. By default this is
T
(truncate).- Type: string
- Default:
T
- Importance: low
redo.log.startup.polling.limit.ms
The amount of time to wait for the redo log to be present on connector startup. This is only relevant when connector is configured to capture change events. On expiration of this wait time, the connector would move to a failed state.
- Type: long
- Default: 300000
- Importance: low
snapshot.by.table.partitions
Whether the connector should perform snapshots on each table partition if the table is defined to use partitions. This is
false
by default, meaning that one snapshot is performed on each table in its entirety.- Type: boolean
- Default: false
- Importance: low
oracle.validation.result.fetch.size
The fetch size to be used while querying database for validations. This will be used to query list of tables and supplemental logging level validation.
- Type: int
- Default:
5000
- Importance: low
redo.log.row.poll.fields.include
A comma-separated list of fields from the V$LOGMNR_CONTENTS view to include in the redo log events.
- Type: list
- Importance: low
redo.log.row.poll.fields.exclude
A comma-separated list of fields from the V$LOGMNR_CONTENTS view to exclude in the redo log events.
- Type: list
- Importance: low
redo.log.row.poll.username.include
A comma-separated list of database usernames. When this property is set, the connector captures changes only from the specified set of database users. You cannot set this property along with the
redo.log.row.poll.username.exclude
property.- Type: list
- Importance: low
redo.log.row.poll.username.exclude
A comma-separated list of database usernames. When this property is set, the connector captures changes only from the specified set of database users. You cannot set this property along with the
redo.log.row.poll.username.exclude
property.- Type: list
- Importance: low
db.timezone
Default timezone to assume when parsing Oracle
DATE
andTIMESTAMP
types for which timezone information is not available. For example, ifdb.timezone=UTC
, the data in bothDATE
andTIMESTAMP
will be parsed as if in UTC timezone. The value has to be a valid java.util.TimeZone ID.- Type: string
- Default: UTC
- Importance: low
db.timezone.date
The default timezone to assume when parsing Oracle
DATE
type for which timezone information is not available. Ifdb.timezone.date
is set, the value ofdb.timezone
forDATE
type will be overwritten with the value indb.timezone.date
. For example, ifdb.timezone=UTC
anddb.timezone.date=America/Los_Angeles
, the dataTIMESTAMP
will be parsed as if it is in UTC timezone, and the data inDATE
will be parsed as if in America/Los_Angeles timezone. The value has to be a validjava.util.TimeZone
ID.- Type: string
- Importance: low
oracle.supplemental.log.level
Database supplemental logging level for connector operation. If set to
full
, the connector validates the supplemental logging level on the database is FULL and then captures snapshots and CDC events for the specified tables whenevertable.topic.name.template
is not set to""
. When the level is set tomsl
, the connector doesn’t capture the CDC change events, rather it only captures snapshots iftable.topic.name.template
is not set to""
. Note that this setting is ignored if thetable.topic.name.template
is set to""
as the connector will only capture redo logs. This setting defaults tofull
supplemental logging level mode.- Type: string
- Default: full
- Valid Values: [msl, full]
- Importance: low
ldap.url
The connection URL of LDAP server if using OID based LDAP.
- Type: string
- Importance: low
ldap.security.principal
The login principal or user if using SIMPLE Authentication for LDAP.
- Type: string
- Importance: low
ldap.security.credentials
The login password for principal if using SIMPLE Authentication for LDAP.
- Type: string
- Importance: low
oracle.ssl.truststore.file
If using SSL for encryption and server authentication, set this to the location of the trust store containing server certificates that should be trusted.
- Type: string
- Default: “”
- Importance: low
oracle.ssl.truststore.password
If using SSL for encryption and server authentication, the password of the trust store containing server certificates that should be trusted.
- Type: string
- Default: “”
- Importance: low
oracle.kerberos.cache.file
If using Kerberos 5 authentication, set this to the location of the Kerberos 5 ticket cache file on all the Connect workers.
- Type: string
- Default: “”
- Importance: low
log.sensitive.data
If set to
true
, the connector logs sensitive data–such as customer records, SQL queries, and exception traces containing sensitive data. Confluent recommends you set this parameter totrue
only in cases where logging sensitive data is acceptable and necessary for troubleshooting.- Type: boolean
- Default: false
- Importance: low
retry.error.codes
A comma-separated list of Oracle error codes (for example,
12505, 12528,...
) that the connector retries up to the time defined by themax.retry.time.ms
parameter. By default, the connector retries in case of a recoverable or transient SQL exception and on certain Oracle error codes.- Type: list
- Importance: low
enable.metrics.collection
If set to
true
, the connector records metrics that can be used to gain insight into the connector and troubleshoot issues. These metrics can be accessed using Java Management Extensions (JMX).- Type: boolean
- Default: false
- Importance: low
Confluent Platform license¶
confluent.topic.bootstrap.servers
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster used for licensing. All servers in the cluster will be discovered from the initial connection. This list should be in the form
host1:port1,host2:port2,...
. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).- Type: list
- Importance: high
confluent.topic
Name of the Kafka topic used for Confluent Platform configuration, including licensing information.
- Type: string
- Default: _confluent-command
- Importance: low
confluent.topic.replication.factor
The replication factor for the Kafka topic used for Confluent Platform configuration, including licensing information. This is used only if the topic does not already exist, and the default of 3 is appropriate for production use. If you are using a development environment with less than 3 brokers, you must set this to the number of brokers (often 1).
- Type: int
- Default: 3
- Importance: low
Connection pool properties¶
The following properties are used to configure the connection pool used to share connections from the connector to the Oracle database.
connection.pool.initial.size
The number of JDBC connections created immediately upon startup, before any connection is needed. A non-zero initial size can be used to reduce the ramp-up time incurred by priming the pool to its optimal size.
- Type: int
- Default: 0
- Importance: medium
connection.pool.min.size
The minimum number of available and borrowed JDBC connections in each worker’s connection pool. A connection pool always tries to return to the minimum pool size specified unless the minimum number has yet to be reached.
- Type: int
- Default: 0
- Importance: medium
connection.pool.max.size
The maximum number of available and borrowed JDBC connections in each worker’s connection pool. If the maximum number of connections are borrowed (in use), no connections will be available for other uses until a borrowed connection is returned to the pool.
- Type: int
- Default: 1
- Importance: medium
connection.pool.login.timeout.ms
The maximum time in milliseconds to wait for a connection to be established before failing. A value of zero specifies that the system timeout should be used.
- Type: long
- Default: 0
- Importance: low
connection.pool.connection.wait.timeout.ms
Specifies how long an application request waits to obtain a connection if there are no longer any connections in the pool. A connection pool runs out of connections if all connections in the pool are being used (borrowed) and if the pool size has reached its maximum connection capacity, as specified by the maximum pool size property. This timeout feature improves overall application usability by minimizing the amount of time an application is blocked and provides the ability to implement a graceful recovery. A value of zero disables the feature.
- Type: long
- Default: 0
- Importance: low
connection.pool.validation.on.borrow.enabled
Specifies whether connections are validated when they are borrowed from the connection pool.
- Type: boolean
- Default:
true
- Importance: low
connection.pool.validation.sql
Specifies the SQL statement to use when validating connections.
- Type: string
- Default: null
- Importance: low
connection.pool.max.ttl.ms
The maximum time a borrowed connection can remain borrowed before the connection is reclaimed by the pool. A value of
0
disables the feature.- Type: long
- Default:
0L
- Importance: low
connection.pool.abandoned.timeout.ms
The maximum time in milliseconds that a borrowed connection can remain unused before the pool reclaims the connection. A value of
0
disables the feature.- Type: long
- Default:
0L
- Importance: low
connection.pool.inactive.timeout.ms
Specifies how long an available connection can remain idle before it is closed and removed from the pool. This timeout property is only applicable to available connections and does not affect borrowed connections. This property helps conserve resources that are otherwise lost on maintaining connections that are no longer being used. The inactive connection timeout (together with the maximum pool size) allows a connection pool to grow and shrink as application load changes. A value of
0
disables the feature.- Type: long
- Default:
0L
- Importance: low
connection.pool.check.interval.timeout.ms
Specifies how frequently the timeout properties (abandoned connection timeout, time-to-live connection timeout, and inactive connection timeout) are enforced. The default is 30 seconds (30000 milliseconds).
- Type: long
- Default: 0
- Importance: low
connection.pool.connection.wait.timeout.ms
Specifies how long an application request waits to obtain a connection if there are no longer any connections in the pool. A connection pool runs out of connections if all connections in the pool are being used (borrowed) and if the pool size has reached it’s maximum connection capacity as specified by the maximum pool size property. This timeout feature improves overall application usability by minimizing the amount of time an application is blocked and provides the ability to implement a graceful recovery. A value of
0
disables the feature.- Type: long
- Default: 0
- Importance: low
connection.pool.validation.timeout.ms
The maximum duration in milliseconds for a connection validation on a borrowed connection.
- Type: long
- Default: 0
- Importance: low
connection.pool.trust.idle.time.ms
The time in seconds to trust a recently-used and -tested database connection and skip the validation test during connection checkout.
- Type: long
- Default: 0
- Importance: low
connection.pool.max.reuse.time.ms
Allows connections to be gracefully closed and removed from the pool after a connection has been in use for a specific amount of time. This property is typically used to periodically recycle connections in order to eliminate issues such as memory leaks.
- Type: long
- Default:
0L
- Importance: low
connection.pool.max.reuse.count
Allows connections to be gracefully closed and removed from the connection pool after a connection has been borrowed a specific number of times. This property is typically used to periodically recycle connections in order to eliminate issues such as memory leaks.
- Type: int
- Default: 0
- Importance: low
connection.pool.property.cycle.interval.ms
The time interval in milliseconds between checks to enforce connection pool timeout properties.
- Type: long
- Default:
0L
- Importance: low
connection.pool.property.max.statements
The maximum number of statements to cache for each connection. Statement caching is not used when set to 0 (the default).
- Type: int
- Default: 0
- Importance: low
connection.pool.harvest.trigger.count
Specifies the available connection threshold that triggers connection harvesting. For example, if the connection harvest trigger count is set to 10, then connection harvesting is triggered when the number of available connections in the pool drops to 10. A value of 2147483647 (the default) disables harvesting.
- Type: int
- Default: 0
- Importance: low
connection.pool.harvest.max.count
How many borrowed connections should be returned to the pool once the harvest trigger count has been reached.
- Type: int
- Default: 0
- Importance: low
connection.pool.fast.failover.enabled
Enables Fast Connection Failover (FCF) for the connection pool accessed using this pool-enabled data source. By default, FCF is disabled.
- Type: boolean
- Default: 0
- Importance: low
redo.log.initial.delay.interval.ms
The amount of time to wait for in-flight transactions to complete after the initial snapshot is taken, before the initial startup of reading redo logs. Only relevant on cold start when connector is configured to capture change events.
- Type: long
- Default: 0
- Importance: low
Confluent license properties¶
You can put license-related properties in the connector configuration, or starting with Confluent Platform version 6.0, you can put license-related properties in the Connect worker configuration instead of in each connector configuration.
This connector is proprietary and requires a license. The license information is stored in the _confluent-command
topic. If the broker requires SSL for connections, you must include the security-related confluent.topic.*
properties
as described below.
confluent.license
Confluent issues enterprise license keys to each subscriber. The license key is text that you can copy and paste as the value for
confluent.license
. A trial license allows using the connector for a 30-day trial period. A developer license allows using the connector indefinitely for single-broker development environments.If you are a subscriber, contact Confluent Support for more information.
- Type: string
- Default: “”
- Valid Values: Confluent Platform license
- Importance: high
confluent.topic.ssl.truststore.location
The location of the trust store file.
- Type: string
- Default: null
- Importance: high
confluent.topic.ssl.truststore.password
The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
- Type: password
- Default: null
- Importance: high
confluent.topic.ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
- Type: string
- Default: null
- Importance: high
confluent.topic.ssl.keystore.password
The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
- Type: password
- Default: null
- Importance: high
confluent.topic.ssl.key.password
The password of the private key in the key store file. This is optional for client.
- Type: password
- Default: null
- Importance: high
confluent.topic.security.protocol
Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
- Type: string
- Default: “PLAINTEXT”
- Importance: medium
License topic configuration¶
A Confluent enterprise license is stored in the _confluent-command
topic.
This topic is created by default and contains the license that corresponds to
the license key supplied through the confluent.license
property. No public
keys are stored in Kafka topics.
The following describes how the default _confluent-command
topic is
generated under different scenarios:
- A 30-day trial license is automatically generated for the
_confluent command
topic if you do not add theconfluent.license
property or leave this property empty (for example,confluent.license=
). - Adding a valid license key (for example,
confluent.license=<valid-license-key>
) adds a valid license in the_confluent-command
topic.
Here is an example of the minimal properties for development and testing.
You can change the name of the _confluent-command
topic using the
confluent.topic
property (for instance, if your environment has strict
naming conventions). The example below shows this change and the configured
Kafka bootstrap server.
confluent.topic=foo_confluent-command
confluent.topic.bootstrap.servers=localhost:9092
The example above shows the minimally required bootstrap server property that
you can use for development and testing. For a production environment, you add
the normal producer, consumer, and topic configuration properties to the
connector properties, prefixed with confluent.topic.
.
License topic ACLs¶
The _confluent-command
topic contains the license that corresponds to the
license key supplied through the confluent.license
property. It is created
by default. Connectors that access this topic require the following ACLs
configured:
CREATE and DESCRIBE on the resource cluster, if the connector needs to create the topic.
DESCRIBE, READ, and WRITE on the
_confluent-command
topic.Important
You can also use DESCRIBE and READ without WRITE to restrict access to read-only for license topic ACLs. If a topic exists, the LicenseManager will not try to create the topic.
You can provide access either individually for each principal that will
use the license or use a wildcard entry to
allow all clients. The following examples show commands that you can use to
configure ACLs for the resource cluster and _confluent-command
topic.
Set a CREATE and DESCRIBE ACL on the resource cluster:
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation CREATE --operation DESCRIBE --cluster
Set a DESCRIBE, READ, and WRITE ACL on the
_confluent-command
topic:kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation DESCRIBE --operation READ --operation WRITE --topic _confluent-command
Override Default Configuration Properties¶
You can override the replication factor using
confluent.topic.replication.factor
. For example, when using a Kafka cluster
as a destination with less than three brokers (for development and testing) you
should set the confluent.topic.replication.factor
property to 1
.
You can override producer-specific properties by using the
producer.override.*
prefix (for source connectors) and consumer-specific
properties by using the consumer.override.*
prefix (for sink connectors).
You can use the defaults or customize the other properties as well. For example,
the confluent.topic.client.id
property defaults to the name of the connector
with -licensing
suffix. You can specify the configuration settings for
brokers that require SSL or SASL for client connections using this prefix.
You cannot override the cleanup policy of a topic because the topic always has a single partition and is compacted. Also, do not specify serializers and deserializers using this prefix; they are ignored if added.