Quick Start for Oracle XStream CDC Source Connector for Confluent Platform¶
The self-managed Oracle XStream CDC Source connector for Confluent Platform captures all changes made to rows in an Oracle database and represents the changes as change event records in Apache Kafka® topics. The connector can be configured to capture changes from a subset of tables in a database by using an include regular expression to match the table identifiers. It can also be configured to not capture tables that match a separate exclude regular expression.
Install Oracle XStream CDC Connector¶
The following section provides the steps to install the Oracle XStream CDC connector.
Prerequisites¶
Before you proceed with the Oracle XStream CDC connector installation, ensure you do the following:
Complete Oracle Database Prerequisites
Note
Download and execute the
orclcdc_readiness.sql
PL/SQL script to validate the completion of prerequisites and ensure that your database is properly configured to work with the Oracle XStream CDC Source connector.
Prepare the Connector¶
The connector uses the XStream OUT Java API, which in turn requires the Oracle OCI (thick) JDBC driver. The OCI driver requires an Oracle client installation of the exact same version as the JDBC driver. It is recommended to use the OCI Instant Client , which does not require a complete Oracle client installation, and includes the appropriate JDBC driver version.
Install Oracle instant client¶
Follow the steps below to install and configure the Oracle Instant Client:
- Download Oracle Instant Client package for your OS and platform from the Oracle Technology Network website.
- Extract the Instant Client shared libraries and Oracle JDBC class libraries to a directory, such as
instantclient
. - (Linux only) Install the
libaio
package for your operating system. On certain Linux distributions, it may be referred to aslibaio1
. - Set the library path environment variable to the directory from the previous step. For example, on UNIX systems, set the
LD_LIBRARY_PATH
environment variable toinstantclient
.
For more information about Oracle Instant Client, see Oracle documentation.
Install the connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- An installation of the Confluent CLI.
- An installation of the latest (
latest
) connector version.
Install the connector using the Confluent CLI¶
To install the latest
connector version, navigate to your Confluent Platform installation directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-oracle-xstream-cdc-source:latest
You can install a specific version by replacing latest
with a version number as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-oracle-xstream-cdc-source:1.0.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Add the connector dependencies¶
- Add the the
ojdbc8.jar
andxstreams.jar
libraries from the Oracle Instant Client installation to the pluginlib
directory as required dependencies for the connector. - (Optional) Add the
orai18n.jar
library from Oracle to the pluginlib
directory as a dependency for the connector if Oracle database internationalization (i18n) support is required.
Deploy the connector¶
You can deploy the connector using the CLI, API, or UI. You can deploy the connector using either of the two databases mentioned below:
Container Database (CDB)¶
Load the connector by passing a
.json
configuration file.confluent local services connect connector load <name-of-connector> --config <path-to-config-file>
The connector setup below performs the following:
- Connects to the
ORCLPDB1
pluggable database located atdb.example.com
on port1521
. - Initiates a snapshot of the
employees
table in thesample
schema within theORCLPDB1
pluggable database. - After snapshot completion, the connector listens for changes made to the
employees
table through theXOUT
outbound server. - Streams the changes to the Kafka topic,
cflt.SAMPLE.EMPLOYEES
.
{ "name": "oracle-connector", "config": { "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector", "tasks.max": "1", "database.hostname": "db.example.com", "database.port": "1521", "database.user": "C##CFLTUSER", "database.password": "secret", "database.dbname": "ORCLCDB", "database.service.name": "ORCLCDB", "database.pdb.name": "ORCLPDB1", "database.out.server.name": "XOUT", "topic.prefix": "cflt", "table.include.list": "SAMPLE.EMPLOYEES", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092" } }
Note
Note that these are the minimum configurations required to run the connector. For detailed configurations, see configuration properties.
- Connects to the
Verify the connector configurations.
confluent local services connect connector config <name-of-connector>
The output should be similar to:
{ "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector", "name": "oracle-connector", "tasks.max": "1", "database.hostname": "db.example.com", "database.port": "1521", "database.user": "C##CFLTUSER", "database.password": "secret", "database.dbname": "ORCLCDB", "database.service.name": "ORCLCDB", "database.pdb.name": "ORCLPDB1", "database.out.server.name": "XOUT", "topic.prefix": "cflt", "table.include.list": "SAMPLE.EMPLOYEES", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092" }
Navigate to Control Center (Legacy) at http://localhost:9021. It may take a minute or two for Control Center (Legacy) to start and load.
Click the controlcenter.cluster tile.
In the navigation menu, click Connect.
Click the
connect-default
cluster in the Connect clusters list.Click Add connector to start creating a connector.
Tip
To see source connectors only, click Filter by category and select Sources.
Select the OracleXStreamSourceConnector tile.
Note
You can also click on Upload connector config file and select a
.json
file from your local machine containing the connector configuration. For example:{ "name": "oracle-connector", "config": { "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector", "tasks.max": "1", "database.hostname": "db.example.com", "database.port": "1521", "database.user": "C##CFLTUSER", "database.password": "secret", "database.dbname": "ORCLCDB", "database.service.name": "ORCLCDB", "database.pdb.name": "ORCLPDB1", "database.out.server.name": "XOUT", "topic.prefix": "cflt", "table.include.list": "SAMPLE.EMPLOYEES", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092" } }
Enter the following configuration values in the following sections:
Name section:
oracle-connector
Common section:
- Tasks max:
1
- Key converter class:
io.confluent.connect.avro.AvroConverter
- Value converter class:
io.confluent.connect.avro.AvroConverter
Oracle Configuration section:
- Hostname:
db.example.com
- Port:
1521
- User:
C##CFLTUSER
- Password:
secret
- Database:
ORCLCDB
- Database service name:
ORCLCDB
- PDB name:
ORCLPDB1
- XStream out server name:
XOUT
Connector configuration section:
- Topic prefix:
cflt
- Include Tables:
SAMPLE.EMPLOYEES
History Storage section:
- Database schema history topic name:
__orcl-schema-changes.cflt
- Kafka broker addresses:
kafka:9092
Additional Properties section:
- key.converter.schema.registry.url:
http://schema-registry:8081
- value.converter.schema.registry.url:
http://schema-registry:8081
Note
Note that these are the minimum configurations required to run the connector. For detailed configurations, see configuration properties.
- Tasks max:
Click Next to review the connector configuration. If satisfied with the settings, click Launch, else click Back and modify the settings.
Non-Container Database (Non-CDB)¶
Note
The non-CDB architecture is deprecated in Oracle database 12c and desupported in Oracle 20c.
Load the connector by passing a
.json
configuration file.confluent local services connect connector load <name-of-connector> --config <path-to-config-file>
The connector setup below performs the following:
- Connects to the database located at
db.example.com
on port1521
. - Initiates a snapshot of the
employees
table in thesample
schema withinORCLCDB
database. - After snapshot completion, the connector listens for changes made to the
employees
table through theXOUT
outbound server. - Streams the changes to the Kafka topic,
cflt.SAMPLE.EMPLOYEES
.
{ "name": "oracle-connector", "config": { "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector", "tasks.max": "1", "database.hostname": "db.example.com", "database.port": "1521", "database.user": "CFLTUSER", "database.password": "secret", "database.dbname": "ORCLCDB", "database.service.name": "ORCLCDB", "database.out.server.name": "XOUT", "topic.prefix": "cflt", "table.include.list": "SAMPLE.EMPLOYEES", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092" } }
Note
Note that these are the minimum configurations required to run the connector. For detailed configurations, see configuration properties.
- Connects to the database located at
Verify the connector configurations.
confluent local services connect connector config <name-of-connector>
The output should be similar to:
{ "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector", "name": "oracle-connector", "tasks.max": "1", "database.hostname": "db.example.com", "database.port": "1521", "database.user": "CFLTUSER", "database.password": "secret", "database.dbname": "ORCLCDB", "database.service.name": "ORCLCDB", "database.out.server.name": "XOUT", "topic.prefix": "cflt", "table.include.list": "SAMPLE.EMPLOYEES", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092" }
Navigate to Control Center (Legacy) at http://localhost:9021. It may take a minute or two for Control Center (Legacy) to start and load.
Click the controlcenter.cluster tile.
In the navigation menu, click Connect.
Click the
connect-default
cluster in the Connect clusters list.Click Add connector to start creating a connector.
Tip
To see source connectors only, click Filter by category and select Sources.
Select the OracleXStreamSourceConnector tile.
Note
You can also click on Upload connector config file and select a
.json
file from your local machine containing the connector configuration. For example:{ "name": "oracle-connector", "config": { "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector", "tasks.max": "1", "database.hostname": "db.example.com", "database.port": "1521", "database.user": "CFLTUSER", "database.password": "secret", "database.dbname": "ORCLCDB", "database.service.name": "ORCLCDB", "database.out.server.name": "XOUT", "topic.prefix": "cflt", "table.include.list": "SAMPLE.EMPLOYEES", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092" } }
Enter the following configuration values in the following sections:
Name section:
oracle-connector
Common section:
- Tasks max:
1
- Key converter class:
io.confluent.connect.avro.AvroConverter
- Value converter class:
io.confluent.connect.avro.AvroConverter
Oracle Configuration section:
- Hostname:
db.example.com
- Port:
1521
- User:
CFLTUSER
- Password:
secret
- Database:
ORCLCDB
- Database service name:
ORCLCDB
- XStream out server name:
XOUT
Connector configuration section:
- Topic prefix:
cflt
- Include Tables:
SAMPLE.EMPLOYEES
History Storage section:
- Database schema history topic name:
__orcl-schema-changes.cflt
- Kafka broker addresses:
kafka:9092
Additional Properties section:
- key.converter.schema.registry.url:
http://schema-registry:8081
- value.converter.schema.registry.url:
http://schema-registry:8081
Note
Note that these are the minimum configurations required to run the connector. For detailed configurations, see configuration properties.
- Tasks max:
Click Next to review the connector configuration. If satisfied with the settings, click Launch, else click Back and modify the settings.
Connect to an Oracle Real Application Cluster (RAC) Database¶
Confluent recommends configuring the following properties to ensure that the connector will be able to connect and attach to the specific RAC instance running the XStream components:
Configure the
database.hostname
property to the Oracle RAC database SCAN address.Note
If a SCAN address is unavailable, configure the
database.hostname
property to the hostname of the instance where the XStream components are running. You will need to manually reconfigure the connector whenever the instance running the XStream components changes.Configure the
database.service.name
property to the auto-created Oracle XStream service.
Security¶
Using Native Network Encryption¶
Oracle database provides native network encryption and integrity to ensure data is secure during transit, without the need for setting up Transport Layer Security (TLS).
Encryption and integrity are managed based on a combination of client-side and server-side encryption settings, which can be configured using parameters in the sqlnet.ora
configuration file. For more information on configuring Oracle Advanced Security for network encryption and integrity, see Support for Network Encryption and Integrity.
The connector uses the Oracle JDBC OCI driver to communicate with the Oracle database. You can use the database’s native network encryption and data integrity to securely transmit data between the connector and the Oracle database. Relevant configuration settings can be found in Table 9-2 OCI Driver Client Parameters for Encryption and Integrity.
To enable network encryption and integrity, configure the following parameters:
- On the client:
SQLNET.ENCRYPTION_CLIENT
andSQLNET.CRYPTO_CHECKSUM_CLIENT
- On the server:
SQLNET.ENCRYPTION_SERVER
andSQLNET.CRYPTO_CHECKSUM_SERVER
Additionally, specify strong encryption and crypto-checksum algorithms using:
- On the client:
SQLNET.ENCRYPTION_TYPES_CLIENT
andSQLNET.CRYPTO_CHECKSUM_TYPES_CLIENT
- On the server:
SQLNET.ENCRYPTION_TYPES_SERVER
andSQLNET.CRYPTO_CHECKSUM_TYPES_SERVER
For more information, see Improving Native Network Encryption Security in the Oracle Database Security Guide.
Using Transport Layer Security (TLS)¶
Transport Layer Security (TLS) can be configured to secure connections between the client (connector) and the Oracle database. Currently, only one-way TLS without client wallets is supported.
In one-way TLS, the database server presents a certificate to authenticate itself to the client (connector). The client needs access to the trusted Certificate Authority (CA) root certificate that signed the server’s certificate to verify it. Currently, the connector only supports certificates signed by well-known CAs, where the corresponding CA certificate is present in the default certificate store of the system running the connector.
For more information on configuring TLS, see Configuring Transport Layer Security Authentication chapter of the Oracle Database Security Guide.
You can enable TLS connections between the connector and the Oracle database server by using the database.tls.mode
configuration property:
- Set
database.tls.mode
to one-way to enable TLS encryption and server authentication.
Note
When database.tls.mode
is set to one-way, ensure that the port specified in database.port
corresponds to the listener on the server that supports TLS connections.