Oracle Database Sink Connector for Confluent Cloud

The managed Oracle Database Sink connector for Confluent Cloud allows you to export data from Apache Kafka® topics to an Oracle database (JDBC). The connector polls data from Kafka to write to the database based on the topic subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and limited auto-evolution is also supported.

Important

If you are still on Confluent Cloud Enterprise, please contact your Confluent Account Executive for more information about using this connector.

Features

The Salesforce SObject Sink connector supports the following features:

  • Idempotent writes: The default insert.mode is INSERT. If it is configured as UPSERT, the connector will use upsert semantics rather than plain insert statements. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, which provides idempotence.
  • SSL support: Supports one-way SSL.
  • Schemas: The connector supports Avro, JSON Schema, and Protobuf input value formats. The connector supports Avro, JSON Schema, Protobuf, and String input key formats. Schema Registry must be enabled to use a Schema Registry-based format.
  • Primary key support: Supported PK modes are kafka, none, record_key, and record_value. Used in conjunction with the PK Fields property.
  • Table and column auto-creation: auto.create and auto-evolve are supported. If tables or columns are missing, they can be created automatically. Table names are created based on Kafka topic names.
  • At least once delivery: This connector guarantees that records from the Kafka topic are delivered at least once.
  • Supports multiple tasks: The connector supports running one or more tasks. More tasks may improve performance.

See Configuration Properties for all available configuration property descriptions. See Cloud connector limitations for additional information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Oracle Database Sink connector. The quick start provides the basics of selecting the connector and configuring it to stream events

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
  • Authorized access to an Oracle database.
  • The database and Kafka cluster should be in the same region. If you use a different region, be aware that you may incur additional data transfer charges.
  • For networking considerations, see Internet access to resources. To use static egress IPs, see Static Egress IP Addresses.
  • The Confluent Cloud CLI installed and configured for the cluster. See Install the Confluent Cloud CLI.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the sink connector.
  • See Database considerations for additional information.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the Oracle Database Sink connector icon.

Oracle Database Sink Connector Icon

Step 4: Set up the connection.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Select one or more topics.

  2. Enter a connector Name.

  3. Select an Input (data coming from the Kafka topic) message value format: AVRO, JSON_SR (JSON Schema), or PROTOBUF. Select an Input key format: STRING, AVRO, JSON_SR (JSON Schema), or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format (i.e., Avro, JSON_SR (JSON Schema), or Protobuf).

  4. Select whether or not to have the connector delete null record values. The default value is true. This selection requires record key as the primary key mode (PK mode).

  5. Enter your Kafka Cluster credentials. The credentials are either the cluster API key and secret or the service account API key and secret.

  6. Enter the Oracle database connection details. For details, see Database Connection Details.

    • The Connection host entry will look similar to database-1.<id>.us-west-2.rds.amazonaws.com.
    • The default SSL mode is verify-full. When set to verify-full, the connector uses a truststore file to verify that the host server is trustworthy, and also verifies that the server host name matches its certificate. When verify-full is used, you must also supply the Distinguished name (DN) for the server host. If set to verify-ca, the connector uses the truststore file only to verify that the host server is trustworthy. The connector uses the trustore file to check the certificate chain up to a trusted certificate authority (CA).
    • You use the Trust store button to upload the truststore file that contains the CA information. You must add the truststore file password.
  7. Select one of the following insert modes:

    • INSERT: Use the standard INSERT row function. An error occurs if the row already exists in the table.
    • UPSERT: This mode is similar to INSERT. However, if the row already exists, the UPSERT function overwrites column values with the new values provided.
  8. Enter a Table name format. This is a format string to use for the destination table name. This property may contain ${topic} as a placeholder for the originating topic name. For example, kafka_${topic} for the topic orders maps to the table name kafka_orders.

  9. Select your Database time zone. Defaults to UTC.

  10. Select a Primary Key mode (optional). Supported modes are listed below:

    • kafka: Kafka coordinates are used as the primary key. Must be used with the PK Fields property.
    • none: No primary keys used.
    • record_key: Fields from the Kafka record key are used. This may be a primitive or a struct.
    • record_value: Fields from the Kafka record value are used. This must be a struct type.

    Important

    With some JDBC dialects, for example the Oracle and MySQL dialects, an exception can occur if you set pk.mode to kafka and auto.create to true. The exception occurs because the connector maps a STRING type to a variable length string (for example TEXT) and not a fixed length string (for example VARCHAR(256)). A primary key must have a fixed length. To avoid this exception, consider the following:

    • Do not set auto.create to true.
    • Create the database table and primary key data type in advance.
    • See Database considerations for additional information.
  11. Enter the PK Fields values. This is a list of comma-separated primary key field names. The runtime interpretation of this property depends on the PK mode selected. Options are listed below:

    • kafka: Must be three values representing the Kafka coordinates. If left empty, the coordinates default to __connect_topic,__connect_partition,__connect_offset.
    • none: PK Fields property is not used.
    • record_value: Used to extract fields from the record value. If left empty, all fields from the value struct are used.
  12. Select whether to automatically create a table if none exists. See the note above about setting pk.mode to kafka and auto.create to true.

  13. Select whether to automatically create columns in the table for a Kafka record if none exists.

  14. Enter the maximum number of rows included in batched records. The default value is 3000. You can use this value to limit the amount of data buffered internally by the connector.

  15. Enter the maximum number of tasks the connector can run. More tasks may improve performance.

  16. Transforms and Predicates: See the Single Message Transforms (SMT) documentation for details.

See Configuration Properties for configuration property values and descriptions.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Step 7: Check for records.

Verify that rows are populating the database.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe OracleDatabaseSink

Example output:

Following are the required configs:
connector.class: OracleDatabaseSink
input.data.format
name
kafka.api.key
kafka.api.secret
connection.host
connection.port
connection.user
connection.password
db.name
ssl.server.cert.dn
ssl.rootcertfile
tasks.max
topics

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "OracleDatabaseSink",
  "input.data.format": "AVRO",
  "name": "OracleDatabaseSink_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "************************************************",
  "connection.host ": "<connection-host",
  "connection.port": "1521",
  "connection.user": "<user-name>",
  "connection.password": "<user-password>",
  "db.name": "<database-name>",
  "ssl.server.cert.dn": "<distinquished-database-server-name>",
  "ssl.rootcertfile": "<certificate-text>",
  "tasks.max": "1",
  "topics": "<topic-name>",
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "input.data.format": Sets the input message format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, and PROTOBUF. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See input.key.format in the Configuration Properties for additional options.
  • "name": Sets a name for your new connector.
  • "kafka.api.key" and ""kafka.api.secret": These credentials are either the cluster API key and secret or the service account API key and secret.
  • "connection.<...>": The database connection properties. The connection.host entry will look similar to database-1.<id>.us-west-2.rds.amazonaws.com. For details, see Database Connection Details.
  • "ssl.rootcertfile": The default ssl.mode is verify-full. Use the property ssl.rootcertfile and add the contents of the text certificate file for the property value. For example, "ssl.rootcertfile": "<certificate-text>". See the Configuration Properties for additional ssl.mode options.
  • "ssl.server.cert.dn": The default ssl.mode is verify-full. With this mode, you must provide the distinguished server name. See the Configuration Properties for additional ssl.mode options.
  • "tasks.max": Enter the maximum number of tasks for the connector to use. More tasks may improve performance (that is, consumer lag is reduced with multiple tasks running).
  • "topics": Enter the topic name or a comma-separated list of topic names.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

See Configuration Properties for configuration property values and descriptions.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config oracle-db-sink-config.json

Example output:

Created connector OracleDatabaseSink_0 lcc-do6vzd

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID           |             Name         | Status  | Type | Trace
+------------+--------------------------+---------+------+-------+
lcc-do6vzd   | OracleDatabaseSink_0     | RUNNING | sink |       |

Step 6: Check for records.

Verify that rows are populating the database.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Configuration Properties

The following connector configuration properties can be used with the Oracle Database Sink connector for Confluent Cloud.

input.data.format

The data input value format. Can be AVRO, JSON_SR (JSON Schema), or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format (i.e., Avro, JSON_SR (JSON Schema), or Protobuf).

  • Type: string
  • Importance: high
input.key.format

The data input key format. Can be STRING, AVRO, JSON_SR (JSON Schema), or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format (i.e., Avro, JSON_SR (JSON Schema), or Protobuf). This is not required.

  • Type: string
  • Importance: high
delete.enabled

When set true null record values are deleted. This requires that pk.mode be set to record_key.

  • Type: string
  • Default: false
  • Importance: high
connection.host

The database connection host. For example: jdbc:oracle:thin:@localhost:1521:orclpdb1, jdbc:mysql://localhost/db_name, jdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_name.

  • Type: string
  • Importance: high
connection.port

The database connection port. For example: 1521. The minimum value accepted is 0 and the maximum value is 65535.

  • Type: string
  • Importance: high
connection.password

Database connection user password.

  • Type: password
  • Importance: high
db.name

Database name.

  • Type: string
  • Importance: high
ssl.mode

The default option verfy-full is enabled if ssl.mode is not added to the connector configuration. When set to verify-full, the connector uses a truststore file property (ssl.rootcertfile) to verify that the host server is trustworthy, and also verifies that the server host name matches its certificate. When verify-full is used, you must also supply the distinguished server name (ssl.server.cert.dn) for the server host. If set to verify-ca, the connector uses the truststore file only to verify that the host server is trustworthy. The connector uses the trustore file to check the certificate chain up to a trusted certificate authority (CA).

  • Type: string
  • Default: verify-full
  • Importance: high
ssl.rootcertfile

The text file containing certificate information. This is required if using verify-ca or verify-full SSL mode.

  • Type: password
  • Importance: low
ssl.server.cert.dn

The distinguished server name. This is required if using verify-full SSL mode.

  • Type: password
  • Importance: low
insert.mode

The insertion mode to use. Supported modes are:

insert
Use the standard insert row function. An error occurs if the row already exists in the table.
upsert
This mode is similar to insert. However, if the row already exists, the upsert function overwrites column values with the new values provided.
  • Type: string
  • Default: insert
  • Valid Values: [insert, upsert]
  • Importance: high
table.name.format

The format string to use for the destination table name. This property may contain ${topic} as a placeholder for the originating topic name. For example, kafka_${topic} for the topic orders maps to the table name kafka_orders.

  • Type: string
  • Default: ${topic}
  • Importance: medium
db.timezone

Name of the database timezone that should be used in the connector when inserting time-based values. Defaults to UTC.

  • Type: string
  • Default: “UTC”
  • Valid Values: Any valid database time zone
  • Importance: medium
pk.mode

The primary key mode. Used in conjunction with the pk.fields property. Supported modes are:

none
No keys utilized.
kafka

Apache Kafka® coordinates are used as the primary key.

Important

With some JDBC dialects, for example the Oracle and MySQL dialects, an exception can occur if you set pk.mode to kafka and auto.create to true. The exception occurs because the connector maps a STRING type to a variable length string (for example TEXT) and not a fixed length string (for example VARCHAR(256)). A primary key must have a fixed length. To avoid this exception, consider the following:

  • Do not set auto.create to true.
  • Create the database table and primary key data type in advance.
record_key
Field(s) from the record key are used, which may be a primitive or a struct.
record_value
Field(s) from the record value are used, which must be a struct.
  • Type: string
  • Default: none
  • Valid Values: [none, kafka, record_key, record_value]
  • Importance: high
pk.fields

List of comma-separated primary key field names. The runtime interpretation of this config depends on the pk.mode:

none
Ignored. No fields are used as a primary key.
kafka
Must be a trio representing the Kafka coordinates, defaults to __connect_topic,__connect_partition,__connect_offset if empty. Custom field names that are used for this mode will rename the default column names, but keep the Kafka coordinates as the primary keys.
record_key
If empty, all fields from the key struct will be used. Otherwise, this property is used to extract the desired fields. If using a primitive, only a single field name must be configured.
record_value
If empty, all fields from the value struct will be used, otherwise used to extract the desired fields.
  • Type: list
  • Default: none
  • Importance: medium
auto.create

Whether to automatically create the destination table based on record schema if it is found to be missing by issuing CREATE.

  • Type: boolean
  • Default: false
  • Importance: medium

Important

With some JDBC dialects, for example the Oracle and MySQL dialects, an exception can occur if you set pk.mode to kafka and auto.create to true. The exception occurs because the connector maps a STRING type to a variable length string (for example TEXT) and not a fixed length string (for example VARCHAR(256)). A primary key must have a fixed length. To avoid this exception, consider the following:

  • Do not set auto.create to true.
  • Create the database table and primary key data type in advance.
  • See Database considerations for additional information.
auto.evolve

Whether to automatically add columns in the table schema when found to be missing relative to the record schema by issuing ALTER.

  • Type: boolean
  • Default: false
  • Importance: medium
batch.size

Maximum number of rows to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.

  • Type: int
  • Default: 3000
  • Valid Values: [1,…5000]
  • Importance: low

Database considerations

Note the following issues to keep in mind.

  1. String type is mapped to CLOB when auto.create=true. For example, you have the following Avro schema:

    {
      "connect.name": "ksql.ratings",
      "fields": [
        {
          "name": "rating_id",
          "type": "long"
        },
        {
          "name": "user_id",
          "type": "int"
        },
        ...
        {
          "name": "channel",
          "type": "string"
        },
        {
          "name": "message",
          "type": "string"
        }
      ],
      "name": "ratings",
      "namespace": "ksql",
      "type": "record"
    }
    

    These values are mapped to CLOB in the table schema:

    Name        Null?    Type
    ----------- -------- ----------
    rating_id   NOT NULL NUMBER(19)
    user_id     NOT NULL NUMBER(10)
    stars       NOT NULL NUMBER(10)
    route_id    NOT NULL NUMBER(10)
    rating_time NOT NULL NUMBER(19)
    channel     NOT NULL CLOB
    message     NOT NULL CLOB
    

    Since String is mapped to CLOB when auto.create=true, a field using the String type cannot be used as a primary key. If you want to use a String type field as a primary key, you should create a table in the database first and then use auto.create=false. If not, an exception occurs containing the following line:

    ...
    "stringValue": "Exception chain:\njava.sql.SQLException: ORA-02329:
    column of datatype LOB cannot be unique or a primary key
    ...
    
  2. The table name and column names are case sensitive. For example, you have the following Avro schema:

    {
      "connect.name": "ksql.pageviews",
      "fields": [
        {
          "name": "viewtime",
          "type": "long"
        },
        {
          "name": "userid",
          "type": "string"
        },
        {
          "name": "pageid",
          "type": "string"
        }
      ],
      "name": "pageviews",
      "namespace": "ksql",
      "type": "record"
    }
    

    A table named PAGEVIEWS is created, which causes the exception where pageviews is not found.

    create table pageviews (
      userid VARCHAR(10) NOT NULL PRIMARY KEY,
      pageid VARCHAR(50),
      viewtime VARCHAR(50)
      );
    
    Table PAGEVIEWS created.
    
    DESC pageviews;
    Name     Null?    Type
    -------- -------- ------------
    USERID   NOT NULL VARCHAR2(10)
    PAGEID            VARCHAR2(50)
    VIEWTIME          VARCHAR2(50)
    

    An exception message similar to the following one will be in the DLQ:

    {
      "key": "__connect.errors.exception.message",
      "stringValue": "Table \"pageviews\" is missing and auto-creation
      is disabled"
    }
    

    To resolve this issue, create a table in Oracle Database first and use auto.create=false.

    create table "pageviews" (
      "userid" VARCHAR(10) NOT NULL PRIMARY KEY,
      "pageid" VARCHAR(50),
      "viewtime" VARCHAR(50)
      );
    
    Table "pageviews" created.
    
    
    DESC "pageviews";
    
    Name     Null?    Type
    -------- -------- ------------
    userid   NOT NULL VARCHAR2(10)
    pageid            VARCHAR2(50)
    viewtime          VARCHAR2(50)
    

    Note

    Note that SQL standards define databases to be case insensitive for identifiers and keywords unless they are quoted. What this means is that CREATE TABLE test_case creates a table named TEST_CASE and CREATE TABLE "test_case" creates a table named test_case. This is also true of table column identifiers. For additional information about identifier quoting, see Database Identifiers, Quoting, and Case Sensitivity.

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png