Quick Start for Oracle XStream CDC Source Connector for Confluent Platform

The self-managed Oracle XStream CDC Source connector for Confluent Platform captures all changes made to rows in an Oracle database and represents the changes as change event records in Apache Kafka® topics. The connector can be configured to capture changes from a subset of tables in a database by using an include regular expression to match the table identifiers. It can also be configured to not capture tables that match a separate exclude regular expression.

Install Oracle XStream CDC Connector

The following section provides the steps to install the Oracle XStream CDC connector.

Prerequisites

Before you proceed with the Oracle XStream CDC connector installation, ensure you do the following:

Prepare the Connector

The connector uses the XStream OUT Java API, which in turn requires the Oracle OCI (thick) JDBC driver. The OCI driver requires an Oracle client installation of the exact same version as the JDBC driver. It is recommended to use the OCI Instant Client , which does not require a complete Oracle client installation, and includes the appropriate JDBC driver version.

Install Oracle instant client

Follow the steps below to install and configure the Oracle Instant Client:

  1. Download Oracle Instant Client package for your OS and platform from the Oracle Technology Network website.
  2. Extract the Instant Client shared libraries and Oracle JDBC class libraries to a directory, such as instantclient.
  3. (Linux only) Install the libaio package for your operating system. On certain Linux distributions, it may be referred to as libaio1.
  4. Set the library path environment variable to the directory from the previous step. For example, on UNIX systems, set the LD_LIBRARY_PATH environment variable to instantclient.

For more information about Oracle Instant Client, see Oracle documentation.

Install the connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites
  • You must install the connector on every machine where Connect will run.
  • An installation of the Confluent CLI.
  • An installation of the latest (latest) connector version.
Install the connector using the Confluent CLI

To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-oracle-xstream-cdc-source:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-oracle-xstream-cdc-source:1.0.0
Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Add the connector dependencies
  1. Add the the ojdbc8.jar and xstreams.jar libraries from the Oracle Instant Client installation to the plugin lib directory as required dependencies for the connector.
  2. (Optional) Add the orai18n.jar library from Oracle to the plugin lib directory as a dependency for the connector if Oracle database internationalization (i18n) support is required.

Deploy the connector

You can deploy the connector using the CLI, API, or UI. You can deploy the connector using either of the two databases mentioned below:

Container Database (CDB)

  1. Load the connector by passing a .json configuration file.

    confluent local services connect connector load <name-of-connector> --config <path-to-config-file>
    

    The connector setup below performs the following:

    • Connects to the ORCLPDB1 pluggable database located at db.example.com on port 1521.
    • Initiates a snapshot of the employees table in the sample schema within the ORCLPDB1 pluggable database.
    • After snapshot completion, the connector listens for changes made to the employees table through the XOUT outbound server.
    • Streams the changes to the Kafka topic, cflt.SAMPLE.EMPLOYEES.
    {
      "name": "oracle-connector",
      "config": {
        "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector",
        "tasks.max": "1",
        "database.hostname": "db.example.com",
        "database.port": "1521",
        "database.user": "C##CFLTUSER",
        "database.password": "secret",
        "database.dbname": "ORCLCDB",
        "database.service.name": "ORCLCDB",
        "database.pdb.name": "ORCLPDB1",
        "database.out.server.name": "XOUT",
        "topic.prefix": "cflt",
        "table.include.list": "SAMPLE.EMPLOYEES",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
      }
    }
    

    Note

    Note that these are the minimum configurations required to run the connector. For detailed configurations, see configuration properties.

  2. Verify the connector configurations.

    confluent local services connect connector config <name-of-connector>
    

    The output should be similar to:

    {
      "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector",
      "name": "oracle-connector",
      "tasks.max": "1",
      "database.hostname": "db.example.com",
      "database.port": "1521",
      "database.user": "C##CFLTUSER",
      "database.password": "secret",
      "database.dbname": "ORCLCDB",
      "database.service.name": "ORCLCDB",
      "database.pdb.name": "ORCLPDB1",
      "database.out.server.name": "XOUT",
      "topic.prefix": "cflt",
      "table.include.list": "SAMPLE.EMPLOYEES",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://schema-registry:8081",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt",
      "schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
    }
    

Non-Container Database (Non-CDB)

Note

The non-CDB architecture is deprecated in Oracle database 12c and desupported in Oracle 20c.

  1. Load the connector by passing a .json configuration file.

    confluent local services connect connector load <name-of-connector> --config <path-to-config-file>
    

    The connector setup below performs the following:

    • Connects to the database located at db.example.com on port 1521.
    • Initiates a snapshot of the employees table in the sample schema within ORCLCDB database.
    • After snapshot completion, the connector listens for changes made to the employees table through the XOUT outbound server.
    • Streams the changes to the Kafka topic, cflt.SAMPLE.EMPLOYEES.
    {
      "name": "oracle-connector",
      "config": {
        "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector",
        "tasks.max": "1",
        "database.hostname": "db.example.com",
        "database.port": "1521",
        "database.user": "CFLTUSER",
        "database.password": "secret",
        "database.dbname": "ORCLCDB",
        "database.service.name": "ORCLCDB",
        "database.out.server.name": "XOUT",
        "topic.prefix": "cflt",
        "table.include.list": "SAMPLE.EMPLOYEES",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
      }
    }
    

    Note

    Note that these are the minimum configurations required to run the connector. For detailed configurations, see configuration properties.

  2. Verify the connector configurations.

    confluent local services connect connector config <name-of-connector>
    

    The output should be similar to:

    {
      "connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector",
      "name": "oracle-connector",
      "tasks.max": "1",
      "database.hostname": "db.example.com",
      "database.port": "1521",
      "database.user": "CFLTUSER",
      "database.password": "secret",
      "database.dbname": "ORCLCDB",
      "database.service.name": "ORCLCDB",
      "database.out.server.name": "XOUT",
      "topic.prefix": "cflt",
      "table.include.list": "SAMPLE.EMPLOYEES",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://schema-registry:8081",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "schema.history.internal.kafka.topic": "__orcl-schema-changes.cflt",
      "schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
    }
    

Connect to an Oracle Real Application Cluster (RAC) Database

Confluent recommends configuring the following properties to ensure that the connector will be able to connect and attach to the specific RAC instance running the XStream components:

  • Configure the database.hostname property to the Oracle RAC database SCAN address.

    Note

    If a SCAN address is unavailable, configure the database.hostname property to the hostname of the instance where the XStream components are running. You will need to manually reconfigure the connector whenever the instance running the XStream components changes.

  • Configure the database.service.name property to the auto-created Oracle XStream service.

Security

Using Native Network Encryption

Oracle database provides native network encryption and integrity to ensure data is secure during transit, without the need for setting up Transport Layer Security (TLS).

Encryption and integrity are managed based on a combination of client-side and server-side encryption settings, which can be configured using parameters in the sqlnet.ora configuration file. For more information on configuring Oracle Advanced Security for network encryption and integrity, see Support for Network Encryption and Integrity.

The connector uses the Oracle JDBC OCI driver to communicate with the Oracle database. You can use the database’s native network encryption and data integrity to securely transmit data between the connector and the Oracle database. Relevant configuration settings can be found in Table 9-2 OCI Driver Client Parameters for Encryption and Integrity.

To enable network encryption and integrity, configure the following parameters:

  • On the client: SQLNET.ENCRYPTION_CLIENT and SQLNET.CRYPTO_CHECKSUM_CLIENT
  • On the server: SQLNET.ENCRYPTION_SERVER and SQLNET.CRYPTO_CHECKSUM_SERVER

Additionally, specify strong encryption and crypto-checksum algorithms using:

  • On the client: SQLNET.ENCRYPTION_TYPES_CLIENT and SQLNET.CRYPTO_CHECKSUM_TYPES_CLIENT
  • On the server: SQLNET.ENCRYPTION_TYPES_SERVER and SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER

For more information, see Improving Native Network Encryption Security in the Oracle Database Security Guide.

Using Transport Layer Security (TLS)

Transport Layer Security (TLS) can be configured to secure connections between the client (connector) and the Oracle database. Currently, only one-way TLS without client wallets is supported.

In one-way TLS, the database server presents a certificate to authenticate itself to the client (connector). The client needs access to the trusted Certificate Authority (CA) root certificate that signed the server’s certificate to verify it. Currently, the connector only supports certificates signed by well-known CAs, where the corresponding CA certificate is present in the default certificate store of the system running the connector.

For more information on configuring TLS, see Configuring Transport Layer Security Authentication chapter of the Oracle Database Security Guide.

You can enable TLS connections between the connector and the Oracle database server by using the database.tls.mode configuration property:

  • Set database.tls.mode to one-way to enable TLS encryption and server authentication.

Note

When database.tls.mode is set to one-way, ensure that the port specified in database.port corresponds to the listener on the server that supports TLS connections.