Oracle XStream CDC Source Connector for Confluent Cloud¶
The fully-managed Oracle XStream CDC Source connector for Confluent Cloud captures all changes made to rows in an Oracle database and represents the changes as change event records in Apache Kafka® topics. The connector can be configured to capture changes from a subset of tables in a database by using an include regular expression to match the table identifiers. It can also be configured to not capture tables that match a separate exclude regular expression.
This Quick Start is for the fully-managed Confluent Cloud connector. If you are installing the connector locally for Confluent Platform, see Oracle XStream CDC Source for Confluent Platform.
If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.
Supported Versions¶
Be sure to review the following information before using the Oracle XStream CDC Source connector.
Oracle versions¶
The connector is compatible with the following Oracle versions:
- Oracle 19c Enterprise Edition
- Oracle 21c Enterprise Edition
Java versions¶
The connector requires Java version 17 or higher.
Limitations¶
Be sure to review the following information:
- The connector has not been tested against Oracle Exadata and managed database services from cloud service providers (CSPs), such as OCI and RDS.
- The connector does not work with Oracle Autonomous Databases and Oracle Standby databases (using Oracle Data Guard).
- The connector does not support Downstream Capture configurations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
- The connector does not support the following Single Message Transforms (SMTs): GzipDecompress, TimestampRouter, and MessageTimestampRouter.
Manage custom offsets¶
You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
To manage offsets:
- Use Confluent Cloud APIs. For more information, see Cluster API reference.
- Use either an Oracle System Change Number (SCN) or Logical Change Record (LCR) position.
To get the current offset, make a GET
request that specifies the environment, Kafka
cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
The following example shows the offset once the snapshot has been completed.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"scn": "4374567",
"snapshot": "INITIAL",
"snapshot_completed": true
}
}
],
"metadata": {
"observed_at": "2025-03-01T12:30:00.151015100Z"
}
}
The following example shows the offset once streaming is in progress.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"lcr_position": "000000000044150e0000000100000001000000000044150d000000140000000102"
}
}
],
"metadata": {
"observed_at": "2025-03-01T12:30:00.151015100Z"
}
}
Responses include the following information:
- The position of latest offset.
- The observed time of the offset in the metadata portion of the payload. The
observed_at
time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Useobserved_at
to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. CallingGET
repeatedly will fetch more recently observed offsets. - Information about the connector.
To update the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies new offset and a patch type.
The following example shows how to update the offset using an SCN.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "PATCH",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"scn": "4374567",
"snapshot": "INITIAL",
"snapshot_completed": true
}
}
]
}
The following example shows how to update the offset using an LCR position.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "PATCH",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"lcr_position": "0000000000432bd400000001000000010000000000432bd3000000140000000102"
}
}
]
}
Considerations:
- You can only make one offset change at a time for a given connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- For source connectors, the connector attempts to read from the position defined by the requested offsets.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the offset.
The following example shows the response to an offset update request using an SCN.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"scn": "4374567",
"snapshot": "INITIAL",
"snapshot_completed": true
}
}
],
"requested_at": "2025-03-01T12:30:00.151015100Z",
"type": "PATCH"
}
The following example shows the response to an offset update request using an LCR position.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"lcr_position": "0000000000432bd400000001000000010000000000432bd3000000140000000102"
}
}
],
"requested_at": "2025-03-01T12:30:00.151015100Z",
"type": "PATCH"
}
Responses include the following information:
- The requested position of the offsets in the source.
- The time of the request to update the offset.
- Information about the connector.
To delete the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the delete type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "DELETE"
}
Considerations:
- Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Do not issue delete and patch requests at the same time.
- For source connectors, the connector attempts to read from the position defined in the base state.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the result.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2025-03-01T12:30:00.151015100Z",
"type": "DELETE"
}
Responses include the following information:
- Empty offsets.
- The time of the request to delete the offset.
- Information about Kafka cluster and connector.
- The type of request.
To get the status of a previous offset request, make a GET
request that specifies the environment, Kafka cluster, and connector
name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud
Considerations:
- The status endpoint always shows the status of the most recent PATCH/DELETE operation.
Response:
Successful calls return HTTP 200
with a JSON payload that describes the result. The following is an example
of an applied patch.
{
"request": {
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"scn": "2778404",
"snapshot": "INITIAL",
"snapshot_completed": true
}
}
],
"requested_at": "2025-03-01T12:30:00.151015100Z",
"type": "PATCH"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"lcr_position": "00000000002a8684000000010000000100000000002a8680000000010000000102"
}
}
],
"applied_at": "2025-03-01T12:30:10.151015100Z"
}
Responses include the following information:
- The original request, including the time it was made.
- The status of the request: applied, pending, or failed.
- The time you issued the status request.
- The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.
JSON payload¶
You can use either an Oracle System Change Number (SCN) or Logical Change Record (LCR) position to update the offsets of the Oracle XStream CDC Source connector.
The table below offers a description of the unique fields in the JSON payload for managing offsets of the connector using an Oracle SCN.
Field | Definition | Required/Optional |
---|---|---|
scn |
The SCN of the change. It is set to the current SCN during the snapshot phase and obtained from the LCR position during the streaming phase. To manage offsets, set this to the SCN from which to resume streaming. |
Required |
snapshot |
Indicates the type of snapshot. It is set to To manage offsets, set this to |
Required |
snapshot_completed |
Indicates whether the snapshot has been completed. To manage offsets, set this to |
Required |
The table below offers a description of the unique fields in the JSON payload for managing offsets of the connector using an LCR position.
Field | Definition | Required/Optional |
---|---|---|
lcr_position |
The position of the LCR. Set in streaming phase only. To manage offsets, set this to the LCR position from which to resume streaming. |
Required |
General considerations¶
An existing outbound server can be used if the SCN or LCR position used in the offsets request is equal to or greater than the outbound server’s processed low position. To find the processed low position for an outbound server, see Displaying the Processed Low Position for an Outbound Server.
An existing outbound server cannot be used if the SCN or LCR position used in the offsets request is earlier than the outbound server’s processed low position. In this case, a new capture process and outbound server much be created with a first SCN and start SCN that precede the SCN or LCR position used in the offset.
- The first SCN and start SCN must be a valid SCN and present in the redo log files available to the capture process.
- The first SCN can be set to any value returned by the following query:
SELECT DISTINCT FIRST_CHANGE#, NAME FROM V$ARCHIVED_LOG WHERE DICTIONARY_BEGIN = 'YES'
- The value returned in the
NAME
column indicates the redo log file containing the SCN corresponding to the first SCN. This redo log file, along with all subsequent redo log files, must be available to the capture process. If the query returns multiple distinctFIRST_CHANGE#
values, choose the first SCN value that is most appropriate for the capture process you are creating.
- The start SCN specified must be greater than or equal to the first SCN for the capture process.
To create a new connector with offsets, set the SCN in the offset and then start the connector in
recovery
snapshot mode using thesnapshot.mode
configuration property. The connector will first capture a snapshot of the schema for the capture tables, populating the schema history topic, and then begin streaming from the specified SCN in the offset. Once the recovery process is complete and streaming begins, reset thesnapshot.mode
to eitherinitial
orno_data
to prevent the connector from initiating a recovery upon future restarts.Important
The connector will fail if there have been schema changes to the captured tables after the specified SCN.
To update the offsets of an existing connector, set the SCN or LCR position in the offset.
Important
The connector will fail if there have been schema changes to the capture tables between the SCN or LCR position specified in the offsets request and the connector’s last processed SCN or LCR position.
Migrate connectors¶
Considerations:
The self-managed connector must be operating in streaming mode. If the self-managed connector is still in the process of making a snapshot, you can either create a new connector on Confluent Cloud which starts the snapshot process from the beginning or wait for the snapshot process to complete and follow the migration guidance.
The configurations of the self-managed connector must match the configurations of the fully-managed connector. You need to set the
snapshot.mode
configuration property torecovery
. This ensures that the connector will first capture a snapshot of the schema for the capture tables, populating the schema history topic, and then begin streaming from the specified SCN in the offset.Important
The connector will fail if there have been schema changes to the captured tables after the specified SCN.
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud Oracle XStream CDC Source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in an Oracle database and then monitoring and recording all subsequent row-level changes.
Before configuring the connector, see Oracle Database Prerequisites for Oracle database configuration information and post-configuration validation steps.
Prerequisites¶
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.
- The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
- Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). For more information, see Schema Registry Enabled Environments.
- For networking considerations, see Networking and DNS. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors.
- Kafka cluster credentials. The following lists the different ways you can provide credentials.
- Enter an existing service account resource ID.
- Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
- Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster¶
To create and launch a Kafka cluster in Confluent Cloud, see Create a kafka cluster in Confluent Cloud.
Step 2: Add a connector¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details¶
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
At the Add Oracle XStream CDC Source Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
- Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
- Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
Note
Freight clusters support only service accounts for Kafka authentication.
- Click Continue.
- Add the following database connection details:
- Database hostname: The IP address or hostname of the Oracle database server.
- Database port: The port number used to connect to Oracle database server. Defaults
to
1521
. - Database username: The name of the Oracle database user connecting to the Oracle database.
- Database password: The password for the Oracle database user connecting to the Oracle database.
- Database name: The name of the database to connect to. In a multitenant architecture, it refers to the container database (CDB) name.
- Database service name: Name of the database service to which to connect. In a multitenant container database, this is the service used to connect to the CDB. For Oracle Real Application Clusters (RACs), use the service created by Oracle XStream.
- Pluggable database (PDB) name: The name of the pluggable database (PDB) to connect to in a
multitenant architecture. By default, this is not set, indicating that the tables to
capture reside in the CDB root. If this is set, you must specify the CDB name in the
Database name
field. - XStream outbound server name: The name of the XStream outbound server to connect to.
- TLS mode: Specify whether to use Transport Layer Security (TLS) to connect to the
Oracle database. Defaults to
disable
. If you set this property toone-way
, the connector uses a TLS-encrypted connection and verifies the server’s TLS certificate against the configured Certificate Authority (CA) certificates. - Total number of Oracle processors to license: The number of Oracle processor licenses required for the source database server or cluster. To determine this out, multiply the total number of processor cores by a core processor licensing factor, as mentioned in the Oracle Processor Core Factor Table.
- Click Continue.
Add the following details:
Output messages
- Output Kafka record key format: Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF. Defaults to AVRO.
- Output Kafka record value format: Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
How should we name your topic(s)?
Topic prefix: The topic prefix provides a namespace for the Oracle database server or cluster used by the connector to capture changes. It must be unique and can include only alphanumeric characters, hyphens, dots, and underscores. This prefix is added to all Kafka topic names receiving events from this connector.
Warning
Do not change the value of this property. If changed, upon restart, the connector will start emitting events to new topics based on the revised value and not to the original topics, and it won’t be able to recover its database schema history topic.
Connector configuration
Table include list: A comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you want to capture. The connector will only capture changes from tables that match these expressions. Each identifier is of the form
schemaName.tableName
. By default, the connector captures changes from all non-system tables in each captured database.To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
Note
If you use this property, do not use the
table.exclude.list
property.Table exclude list: A comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you do not want to capture. The connector will only capture changes from any table that is not specified in the exclude list. Each identifier is of the form
schemaName.tableName
.To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
Note
If you use this property, do not use
table.include.list
property.Snapshot mode: The criteria for running a snapshot upon startup of the connector. Select one of the following snapshot options:
- initial (default): The snapshot includes both the structure (schema) and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
- no_data: The snapshot includes only the structure (schema) of captured tables. Specify this value if you want the connector to capture data only for changes that occur after the snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
- recovery: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth.
Warning
Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown.
Show advanced configurations
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
Auto-restart policy
Enable Connector Auto-restart: Control the auto-restart behavior of the connector and its task in the event of user-actionable errors. Defaults to
true
, enabling the connector to automatically restart in case of user-actionable errors. Set this property tofalse
to disable auto-restart for failed connectors. In such cases, you would need to manually restart the connector.
Connector configuration
Skip unparseable DDL statement: Specifies whether the connector should ignore a DDL statement that cannot be parsed or stop processing for a human to address the issue. The safe default is
false
which causes the connector to fail when it encounters an unparseable DDL statement.Warning
Setting the value to
true
should be done with care as it will cause the connector to skip processing any DDL statement it cannot parse, and this could potentially lead to schema mismatches and data loss.Max retries on snapshot database errors: The number of retry attempts the connector will make to snapshot a table if a database error occurs. This configuration property currently only retries failures related to
ORA-01466
error. By default, no retries are attempted.Emit tombstone on delete: Controls whether a delete event is followed by a tombstone event. The following values are possible:
true (default): For each delete operation, the connector emits a delete event and a subsequent tombstone event.
false: For each delete operation, the connector emits only a delete event.
After a source record is deleted, a tombstone event (the default behavior) enables Kafka to completely delete all events that share the key of the deleted row in topics that have log compaction enabled.
Skipped operations: A comma-separated list of the operation types to skip during streaming. You can configure the connector to skip the following types of operations:
c
(create/insert)u
(update)d
(delete)t
(truncate)
You use
none
to indicate that no operations are skipped. By default, only truncate (t
) operations are skipped.Schema name adjustment mode: Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:
none (default): Does not apply any adjustment.
avro: Replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like
_uxxxx
.Note
_
is an escape sequence like backslash in Java.
Field name adjustment mode: Specifies how field names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:
none (default): Does not apply any adjustment.
avro: Replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like
_uxxxx
.Note
_
is an escape sequence like backslash in Java.
Heartbeat interval (ms): Controls the frequency of heartbeat messages sent by the connector to a heartbeat topic.
Useful in situations when no changes occur in the captured tables for an extended period.
In such cases, there are no change event messages generated, causing the committed source offset to remain unchanged.
As a result, the connector is unable to update the processed low watermark on the outbound server, which could result in the database retaining archived redo log files longer than needed.
The default value is
0
, which disables the heartbeat mechanism.
Database server’s operating system (OS) timezone: Specifies the database server’s operating system timezone. This is used to read the time when the LCR was generated at the source database. The default timezone is UTC. The value has to be a valid
java.time.ZoneId
identifier.
How should we handle data types?
Decimal handling mode: Specifies how the connector should handle NUMBER, DECIMAL and NUMERIC columns. You can set one of the following options:
precise (default): Uses
java.math.BigDecimal
to represent values, which are encoded in the change events using a binary representation and Kafka Connect’sorg.apache.kafka.connect.data.Decimal
type. Depending on the precision and scale, the most appropriate Kafka Connect integer type is used for integral values, ensuring that the value is represented without any loss of precision.string: Encodes values as formatted strings. Using the
string
option is easier to consume, but results in a loss of semantic information about the real type.double: Represents values using Java’s double. Using double values is easier, but can result in a loss of precision.
Time precision mode: Specifies how the connector should handle time, date, and timestamps columns. You can set one of the following options:
adaptive (default): Bases the precision of time, date, and timestamp values on the database column’s precision.
connect: Always represents time, date, and timestamp values using Kafka Connect’s built-in representations for
Time
,Date
, andTimestamp
, which uses millisecond precision regardless of the database columns’ precision.
Transforms
Single Message Transforms: To add a new SMT using the UI, see Add transforms. For more information about the Debezium SMT ExtractNewRecordState, see Debezium transformations.
For all property values and definitions, see Configuration Properties.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
To change the number of tasks, enter the desired number of tasks for the connector to use in the Maximum number of tasks field.
Note
The connector always operates with a single task.
Click Continue.
Verify the connection details by previewing the running configuration.
Tip
For information about previewing your connector output, see Data Previews for Confluent Cloud Connectors.
After you’ve validated that the properties are configured to your satisfaction, click Launch.
The status for the connector should go from Provisioning to Running.
If the connector is not running, see Oracle Database Prerequisites and review the Oracle database configuration information and post-configuration validation steps.
Step 5: Check the Kafka topic¶
After the connector is running, verify that records are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Using the Confluent CLI¶
Complete the following steps to set up and run the connector using the Confluent CLI.
Note
- Make sure you have all your prerequisites completed.
- The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.
Step 1: List the available connectors¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
For example:
confluent connect plugin describe OracleXStreamSource
Example output:
The following are required configs:
connector.class : OracleXStreamSource
database.dbname
database.hostname
database.service.name
database.out.server.name
database.user
database.password
name
output.data.value.format
tasks.max
topic.prefix
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
Step 3: Create the connector configuration file¶
Create a JSON file that contains the connector configuration properties to deploy an instance of the connector.
The connector setup below performs the following:
- Connects to the
ORCLPDB1
pluggable database located atdb.example.com
on port1521
. - Initiates a snapshot of the
employees
table in thesample
schema within theORCLPDB1
pluggable database. - After snapshot completion, the connector listens for changes made to the
employees
table through theXOUT
outbound server. - Streams the changes to the Kafka topic,
cflt.SAMPLE.EMPLOYEES
.
{
"name": "oracle-connector",
"config": {
"connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector",
"tasks.max" : "1",
"database.hostname": "db.example.com",
"database.port": "1521",
"database.user": "C##CFLTUSER",
"database.password": "secret",
"database.dbname": "ORCLCDB",
"database.service.name": "ORCLCDB",
"database.pdb.name": "ORCLPDB1",
"database.out.server.name": "XOUT",
"table.include.list": "SAMPLE.EMPLOYEES",
"topic.prefix": "cflt",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**************************************************"
}
}
Single Message Transforms: To add a new SMT using the CLI, see Add transforms. For more information about the Debezium SMT ExtractNewRecordState, see Debezium transformations.
See Configuration Properties for all properties and definitions.
Step 4: Load the properties file and create the connector¶
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file oracle-xstream-cdc-source.json
Example output:
Created connector OracleXStreamSource_0 lcc-ix4dl
Step 5: Check the connector status¶
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type
+----------+-----------------------+---------+--------+
lcc-ix4dl | OracleXStreamSource_0 | RUNNING | source
Step 6: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
If the connector is not running, see Oracle Database Prerequisites for Oracle XStream CDC Source Connector for Confluent Cloud and review the Oracle database configuration information and post-configuration validation steps.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Configuration Properties¶
Use the following configuration properties with the fully-managed Oracle XStream CDC Source connector.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
Schema Config¶
schema.context.name
Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
- Type: string
- Default: default
- Importance: medium
How should we connect to your database?¶
database.hostname
IP address or hostname of the Oracle database server.
- Type: string
- Importance: high
database.port
Port number of the Oracle database server.
- Type: int
- Default: 1521
- Valid Values: [1,…,65535]
- Importance: high
database.user
Name of the Oracle database user to use when connecting to the database.
- Type: string
- Valid Values: Must match the regex
^[^\?=%&\(\)]*$
- Importance: high
database.password
Password of the Oracle database user to use when connecting to the database.
- Type: password
- Importance: high
database.dbname
Name of the database to connect to. In a multitenant container database, this is the name of the container database (CDB).
- Type: string
- Valid Values: Must match the regex
^([a-zA-Z][a-zA-Z0-9$#_]*)*$
- Importance: high
database.service.name
Name of the database service to which to connect. In a multitenant container database, this is the service used to connect to the container database (CDB). For Oracle Real Application Clusters (RAC), use the service created by Oracle XStream.
- Type: string
- Valid Values: Must match the regex
^([a-zA-Z][a-zA-Z0-9$#._]*)*$
- Importance: high
database.pdb.name
Name of the pluggable database to connect to in a multitenant architecture. The container database (CDB) name must be given via
database.dbname
in this case. This configuration should not be specified when connecting to a non-container database.- Type: string
- Valid Values: Must match the regex
^([a-zA-Z][a-zA-Z0-9$#_]*)*$
- Importance: high
database.out.server.name
Name of the XStream outbound server to connect to.
- Type: string
- Importance: high
database.tls.mode
Specifies whether to use Transport Layer Security (TLS) to connect to the Oracle database. Select one of the following options:
disable (default): Does not use a TLS connection.
one-way: Uses a TLS encrypted connection and also verifies the server’s TLS certificate against the configured Certificate Authority (CA) certificates.
- Type: string
- Default: disable
- Valid Values: disable, one-way
- Importance: medium
database.processor.licenses
Specifies the number of Oracle processor licenses required for the source database server or cluster. The is determined by multiplying the total number of processor cores by a core processor licensing factor, as specified in the Oracle Processor Core Factor Table.
- Type: int
- Valid Values: [1,…,4096]
- Importance: medium
Output messages¶
output.key.format
Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured when using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Default: AVRO
- Valid Values: AVRO, JSON_SR, PROTOBUF
- Importance: high
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured when using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Default: AVRO
- Valid Values: AVRO, JSON_SR, PROTOBUF
- Importance: high
How should we name your topic(s)?¶
topic.prefix
Topic prefix that provides a namespace for the Oracle database server or cluster from which the connector captures changes. The topic prefix should be unique across all other connectors, since it is used as a prefix for all Kafka topic names that receive events from this connector. Only alphanumeric characters, hyphens, dots and underscores are accepted.
Warning: Do not change the value of this property. If you change the value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value. The connector is also unable to recover its database schema history topic.
- Type: string
- Importance: high
Connector configuration¶
table.include.list
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you want to capture. When this property is set, the connector will only capture changes from the specified tables. Each identifier is of the form schemaName.tableName. By default, the connector captures changes from all non-system tables in each captured database. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set the
table.exclude.list
property.- Type: string
- Importance: high
table.exclude.list
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you do not want to capture. When this property is set, the connector captures changes from any table that is not specified in the exclude list. Each identifier is of the form schemaName.tableName. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set the
table.include.list
property.- Type: string
- Importance: high
snapshot.mode
The criteria for running a snapshot upon startup of the connector. Select one of the following snapshot options:
initial (default): The snapshot includes both the structure (schema) and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
no_data: The snapshot includes only the structure (schema) of captured tables. Specify this value if you want the connector to capture data only for changes that occur after the snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
recovery: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. WARNING: Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown.
- Type: string
- Default: initial
- Valid Values: initial, no_data, recovery
- Importance: medium
schema.history.internal.skip.unparseable.ddl
A boolean value that specifies whether the connector should ignore a DDL statement that cannot be parsed or stop processing for a human to address the issue. The safe default is false which causes the connector to fail when it encounters an unparseable DDL statement. Setting the value to true should be done with care as it will cause the connector to skip processing any DDL statement it cannot parse, and this could potentially lead to schema mismatches and data loss.
- Type: boolean
- Default: false
- Importance: low
snapshot.database.errors.max.retries
Specifies the number of retry attempts the connector will make to snapshot a table if a database error occurs. This configuration property currently only retries failures related to ORA-01466 error. By default, no retries are attempted.
- Type: int
- Default: 0
- Valid Values: [0,…,3]
- Importance: low
tombstones.on.delete
Controls whether a delete event is followed by a tombstone event. The following values are possible:
true: For each delete operation, the connector emits a delete event and a subsequent tombstone event.
false: For each delete operation, the connector emits only a delete event.
After a source record is deleted, a tombstone event (the default behavior) enables Kafka to completely delete all events that share the key of the deleted row in topics that have log compaction enabled.
- Type: boolean
- Default: true
- Importance: medium
skipped.operations
A comma-separated list of operations to skip during streaming. You can configure the connector to skip the following types of operations: c (inserts/create), u (updates), d (deletes), t (truncates), and none to indicate nothing is skipped. The default value is t, ensuring that only truncate operations are skipped.
- Type: string
- Default: t
- Importance: low
schema.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:
none (the default) does not apply any adjustment.
avro replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
- Type: string
- Default: none
- Valid Values: avro, avro_unicode, none
- Importance: low
field.name.adjustment.mode
Specifies how field names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:
none (the default) does not apply any adjustment.
avro replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
- Type: string
- Default: none
- Valid Values: avro, avro_unicode, none
- Importance: low
heartbeat.interval.ms
Controls how often the connector sends heartbeat messages to a heartbeat topic. It is useful in situations when no changes occur in the captured tables for an extended period. In such cases, there are no change event messages generated, causing the committed source offset to remain unchanged. As a result, the connector is unable to update the processed low watermark on the outbound server which could result in the database retaining archived redo log files longer than needed. The default value is 0 which disables the heartbeat mechanism.
- Type: int
- Default: 0
- Valid Values: [0,…]
- Importance: medium
database.os.timezone
Specifies the database server’s operating system timezone. This is used to read the time when the LCR was generated at the source database. The default timezone is UTC. The value has to be a valid java.time.ZoneId identifier.
- Type: string
- Default: UTC
- Importance: low
How should we handle data types?¶
decimal.handling.mode
Specifies how the connector should handle NUMBER, DECIMAL and NUMERIC columns. You can set one of the following options:
precise (the default): Uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect’s org.apache.kafka.connect.data.Decimal type. Depending on the precision and scale, the most appropriate Kafka Connect integer type is used for integral values, ensuring that the value is represented without any loss of precision.
string: Encodes values as formatted strings. Using the string option is easier to consume, but results in a loss of semantic information about the real type.
double: Represents values using Java’s double. Using double values is easier, but can result in a loss of precision.
- Type: string
- Default: precise
- Valid Values: double, precise, string
- Importance: medium
time.precision.mode
Specifies how the connector should handle time, date, and timestamp columns. You can set one of the following options:
adaptive (the default): Bases the precision of time, date, and timestamp values on the database column’s precision.
connect: always represents time, date, and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns’ precision.
- Type: string
- Default: adaptive
- Valid Values: adaptive, connect
- Importance: medium
Number of tasks for this connector¶
tasks.max
Specifies the maximum number of tasks for the connector. Since this connector supports only a single task, the maximum is capped at 1.
- Type: int
- Default: 1
- Valid Values: [1,…,1]
- Importance: high
Auto-restart policy¶
auto.restart.on.user.error
Enable connector to automatically restart on user-actionable errors.
- Type: boolean
- Default: true
- Importance: medium
Additional Configs¶
header.converter
The converter class for the headers. This is used to serialize and deserialize the headers of the messages.
- Type: string
- Importance: low
producer.override.compression.type
The compression type for all data generated by the producer.
- Type: string
- Importance: low
value.converter.allow.optional.map.keys
Allow optional string map key when converting from Connect Schema to Avro Schema. Applicable for Avro Converters.
- Type: boolean
- Importance: low
value.converter.auto.register.schemas
Specify if the Serializer should attempt to register the Schema.
- Type: boolean
- Importance: low
value.converter.connect.meta.data
Allow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.
- Type: boolean
- Importance: low
value.converter.enhanced.avro.schema.support
Enable enhanced schema support to preserve package information and Enums. Applicable for Avro Converters.
- Type: boolean
- Importance: low
value.converter.enhanced.protobuf.schema.support
Enable enhanced schema support to preserve package information. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.flatten.unions
Whether to flatten unions (oneofs). Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.generate.index.for.unions
Whether to generate an index suffix for unions. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.generate.struct.for.nulls
Whether to generate a struct variable for null values. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.int.for.enums
Whether to represent enums as integers. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.latest.compatibility.strict
Verify latest subject version is backward compatible when use.latest.version is true.
- Type: boolean
- Importance: low
value.converter.object.additional.properties
Whether to allow additional properties for object schemas. Applicable for JSON_SR Converters.
- Type: boolean
- Importance: low
value.converter.optional.for.nullables
Whether nullable fields should be specified with an optional label. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.optional.for.proto2
Whether proto2 optionals are supported. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.use.latest.version
Use latest version of schema in subject for serialization when auto.register.schemas is false.
- Type: boolean
- Importance: low
value.converter.use.optional.for.nonrequired
Whether to set non-required properties to be optional. Applicable for JSON_SR Converters.
- Type: boolean
- Importance: low
value.converter.wrapper.for.nullables
Whether nullable fields should use primitive wrapper messages. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.wrapper.for.raw.primitives
Whether a wrapper message should be interpreted as a raw primitive at root level. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
key.converter.key.subject.name.strategy
How to construct the subject name for key schema registration.
- Type: string
- Default: TopicNameStrategy
- Importance: low
value.converter.decimal.format
Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:
BASE64 to serialize DECIMAL logical types as base64 encoded binary data and
NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.
- Type: string
- Default: BASE64
- Importance: low
value.converter.flatten.singleton.unions
Whether to flatten singleton unions. Applicable for Avro and JSON_SR Converters.
- Type: boolean
- Default: false
- Importance: low
value.converter.reference.subject.name.strategy
Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.
- Type: string
- Default: DefaultReferenceSubjectNameStrategy
- Importance: low
value.converter.value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry.
- Type: string
- Default: TopicNameStrategy
- Importance: low
Connect to an Oracle Real Application Cluster (RAC) Database¶
Confluent recommends configuring the following properties to ensure that the connector will be able to connect and attach to the specific RAC instance running the XStream components:
Configure the
database.hostname
property to the Oracle RAC database SCAN address.Note
If a SCAN address is unavailable, configure the
database.hostname
property to the hostname of the instance where the XStream components are running. You will need to manually reconfigure the connector whenever the instance running the XStream components changes.Configure the
database.service.name
property to the auto-created Oracle XStream service.
Supported Data Types¶
The connector creates change events for database changes. Each change event mirrors the table’s schema, with a field for every column value. The data type of each table column determines how the connector represents the column values in the corresponding change event fields.
For certain data types, such as numeric data types, you can customize how the connector maps them by modifying the default configuration settings. This allows more control over handling various data types, ensuring that the change events reflect the desired format and meet specific requirements.
Character data types¶
The following table describes how the connector maps character types.
Oracle data type | Connect type |
---|---|
CHAR | STRING |
VARCHAR / VARCHAR2 | STRING |
NCHAR | STRING |
NVARCHAR | STRING |
In all cases, the connector ensures that character data is converted to a string type in Kafka Connect when creating change events.
Numeric data types¶
You can adjust how the connector maps numeric data types by changing the decimal.handling.mode
configuration property.
The table below shows the mapping of numeric types when decimal.handling.mode
is set to precise
.
Oracle data type | Connect type | Notes |
---|---|---|
NUMBER(P, S <= 0) | INT8 / INT16 / INT32 / INT64 / BYTES | Based on the precision and scale, the connector selects a matching Kafka Connect integer type:
NUMBER columns with a scale of |
NUMBER(P, S > 0) | BYTES | org.apache.kafka.connect.data.Decimal |
NUMBER(P, [, * ]) | STRUCT | io.debezium.data.VariableScaleDecimal Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form. |
SMALLINT, INT, INTEGER | BYTES | org.apache.kafka.connect.data.Decimal Oracle maps SMALLINT, INT and INTEGER to NUMBER(38,0). As a result, these types can hold values that exceed the maximum range of any of the INT types. |
NUMERIC, DECIMAL | INT8 / INT16 / INT32 / INT64 / BYTES | Handles in the same way as the NUMBER data type (note that scale defaults to 0 for NUMERIC). |
FLOAT[(P)] Maps to FLOAT(126) when P not mentioned |
STRUCT | io.debezium.data.VariableScaleDecimal Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form. |
REAL - Maps to FLOAT(63) DOUBLE PRECISION - Maps to FLOAT(126) |
STRUCT | io.debezium.data.VariableScaleDecimal Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form. |
BINARY_FLOAT | FLOAT32 | |
BINARY_DOUBLE | FLOAT64 |
Note
When decimal.handling.mode
is set to:
- string: The Oracle numeric data types are mapped to the Kafka Connect
STRING
type. - double: The Oracle numeric data types are mapped to the Kafka Connect
FLOAT64
type.
Temporal data types¶
You can adjust how the connector maps some of the temporal data types by changing the time.precision.mode
configuration property.
The table below shows the mapping of temporal types:
Oracle data type | Connect type | Notes |
---|---|---|
DATE | INT64 | Based on
|
TIMESTAMP[(P)] | INT64 | Based on adaptive: If precision <= 3: io.debezium.time.Timestamp
Else if precision <= 6: io.debezium.time.MicroTimestamp
Else: io.debezium.time.NanoTimestamp
connect: org.apache.kafka.connect.data.Timestamp Represents the number of milliseconds since the UNIX epoch, without timezone information. |
TIMESTAMP WITH TIMEZONE | STRING | io.debezium.time.ZonedTimestamp A string representation of a timestamp with timezone information. |
TIMESTAMP WITH LOCAL TIMEZONE | STRING | io.debezium.time.ZonedTimestamp A string representation of a timestamp in UTC. |
INTERVAL YEAR[(P)] TO MONTH | STRING | io.debezium.time.Interval A string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S. |
INTERVAL DAY[(P)] TO SECOND[(FP)] | STRING | io.debezium.time.Interval A string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S. |
Note
When time.precision.mode
is set to connect
, there could be a loss of precision if the
fractional second precision of a column exceeds 3
, because Oracle supports a higher level
of precision than the logical types in Kafka Connect.
Security¶
Native Network Encryption¶
Oracle database provides native network encryption and integrity to ensure data is secure during transit, without the need for setting up Transport Layer Security (TLS).
Encryption and integrity are managed based on a combination of client-side and server-side
encryption settings, which can be configured using parameters in the sqlnet.ora
configuration
file. For more information on configuring Oracle Advanced Security for network encryption and
integrity, see Support for Network Encryption and Integrity section in the Oracle
Database JDBC Developer’s Guide.
The connector uses the Oracle JDBC OCI driver to communicate with the Oracle database. You can use the database’s native network encryption and data integrity to securely transmit data between the connector and the Oracle database. For more information on relevant configuration settings, see Table 9-2 OCI Driver Client Parameters for Encryption and Integrity in the Oracle Database JDBC Developer’s Guide.
The following configurations are set on the connector:
SQLNET.ALLOW_WEAK_CRYPTO
parameter is set to FALSE to ensure that the connector uses strong algorithms when communicating with the Oracle database.SQLNET.ENCRYPTION_CLIENT
andSQLNET.CRYPTO_CHECKSUM_CLIENT
parameters are set to ACCEPTED (the default value).
To enable network encryption and integrity, configure the SQLNET.ENCRYPTION_SERVER
and
SQLNET.CRYPTO_CHECKSUM_SERVER
parameters on the server to either REQUESTED or REQUIRED.
Additionally, specify strong encryption and crypto-checksum algorithms by setting the
SQLNET.ENCRYPTION_TYPES_SERVER
and SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER
parameters.
For more information, see Improving Native Network Encryption Security
section in the Oracle Database Security Guide.
Transport Layer Security (TLS)¶
You can configure Transport Layer Security (TLS) to secure connections between the client (connector) and the Oracle database. Currently, only one-way TLS, without client wallets, is supported.
In one-way TLS, the database server presents a certificate to authenticate itself to the client (connector). The client needs access to the trusted Certificate Authority (CA) root certificate that signed the server’s certificate to verify it. Currently, the connector only supports certificates signed by well-known CAs, where the corresponding CA certificate is present in the default certificate store of the system running the connector.
For more information on configuring TLS, see Configuring Transport Layer Security Authentication chapter of the Oracle Database Security Guide.
You can enable TLS connections between the connector and the Oracle database server by
using the database.tls.mode
configuration property. Set database.tls.mode
to one-way
to enable TLS encryption and server authentication.
Note
When database.tls.mode
is set to one-way, ensure that the port specified in
database.port
corresponds to the listener on the server that supports TLS connections.
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.