Stream Changes from Oracle Database to Confluent Platform

The connector creates change events for database changes and sends them to Kafka topics. A change event includes a key and a value, with their structures based on the schema of the table where the change occurred.

Every message in key and value consists of two components: a schema and a payload. The schema defines the structure of the payload, and the payload holds the actual data.

Keys

The key includes a field for each column in the table’s primary key. If the table does not have a primary key but has a unique index, then the connector uses the column(s) from the unique index as the change event key(s).

For example, consider the following employees table from the sample database schema:

CREATE TABLE sample.employees (
  id NUMBER(9) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(64) NOT NULL,
  last_name VARCHAR2(64),
  hire_date TIMESTAMP
);

The connector is configured with AVRO data format. The topic.prefix configuration property is set to cflt.

The schema describes the structure of the key column(s). In this example, there is a single field ID of type int corresponding to the id column in the employees table.

{
  "connect.name": "cflt.SAMPLE.EMPLOYEES.Key",
  "fields": [
   {
     "name": "ID",
     "type": "int"
   }
  ],
  "name": "Key",
  "namespace": "cflt.SAMPLE.EMPLOYEES",
  "type": "record"
}

The payload contains the value of the key column(s).

{
  "ID": 1
}

You can interpret this key as identifying the specific row in the sample.employees table (produced by the connector configured with the cflt topic prefix) where the id primary key column has a value of 1.

Values

The structure of the value in a change event message mirrors that of the key. It includes both a schema section, which describes the structure of the payload, and a payload section, which holds the actual data.

The schema of the change event’s values describes the envelope structure of the payload and the nested fields within it.

{
  "connect.name": "cflt.SAMPLE.EMPLOYEES.Envelope",
  "connect.version": 2,
  "fields": [
   {
     "default": null,
     "name": "before",
     "type": [
     "null",
      {
        "connect.name": "cflt.SAMPLE.EMPLOYEES.Value",
        "fields": [
         ...
        ],
        "name": "Value",
        "type": "record"
      }
     ]
   },
   {
     "default": null,
     "name": "after",
     "type": [
     "null",
     "Value"
     ]
   },
   {
     "name": "source",
     "type": {
     "connect.name": "io.confluent.connect.oracle.xstream.Source",
     "fields": [
      ...
     ],
     "name": "Source",
     "namespace": "io.confluent.connect.oracle.xstream",
     "type": "record"
     }
   },
   {
     "default": null,
     "name": "transaction",
     "type": [
     "null",
      {
        "connect.name": "event.block",
        "connect.version": 1,
        "fields": [
         ...
        ],
        "name": "block",
        "namespace": "event",
        "type": "record"
      }
     ]
   },
   {
     "name": "op",
     "type": "string"
   },
   {
     "default": null,
     "name": "ts_ms",
     "type": [
     "null",
     "long"
     ]
   },
   {
     "default": null,
     "name": "ts_us",
     "type": [
     "null",
     "long"
     ]
   },
   {
     "default": null,
     "name": "ts_ns",
     "type": [
     "null",
     "long"
     ]
   }
  ],
  "name": "Envelope",
  "namespace": "cflt.SAMPLE.EMPLOYEES",
  "type": "record"
}

The value schema includes the following fields:

Field

Description

before

(Optional) Contains the state of the row before the change occurred. The name of this schema takes the form <topic_prefix>.<schema_name>.<table_name>.Value. This is null for read (snapshot), create and truncate events.

after

(Optional) Contains the state of the row after the change occurred. The name of this schema takes the form <topic_prefix>.<schema_name>.<table_name>.Value. This is null for delete and truncate events.

transaction

(Optional) Contains metadata information associated with the transaction, such as the transaction id. This field is not part of the current release.

op

(Mandatory) Contains a string value describing the type of operation. It includes one of the following values: c (create or insert), u (update), d (delete), r (read, which indicates a snapshot), or t (truncate).

ts_ms (ts_us, ts_ns)

(Optional) Contains the time (based on the system clock in the JVM that runs the Kafka Connect task) at which the connector generated the event.

source

(Mandatory) Contains metadata about the source of the change event, such as the table name and the timestamp when the change occurred. The name of this schema is io.confluent.connect.oracle.xstream.Source.

The source block contains the following fields:

Field

Description

connector

Connector name

version

Connector version

name

Value of topic.prefix configuration property, for example, cflt

db

Database name, for example, ORCLPDB1

schema

Schema name, for example, SAMPLE

table

Table name, for example, EMPLOYEES

snapshot

If the event is part of an ongoing snapshot or not. For example, last

txId

The transaction ID (n/a for snapshots). For example, 4.31.4721

ts_ms (ts_us, ts_ns)

Timestamp when the record in the source database changed (for snapshots, the timestamp indicates when the snapshot occurred).

scn

SCN of the change. For snapshots, this is the snapshot SCN, for example, 49066206

lcr_position

Position of the LCR (null for snapshots). For example, 0000000002ed814d00000001000000010000000002ed814c000000010000000102

user_name

Username who made the change (not applicable for snapshots)

row_id

Row ID associated with the changed row (not applicable for snapshots)

The change event’s values include fields for each column in the table. Following are the examples of different types of change events that could be generated from the employees table:

A read event is generated for snapshot records. Here’s an example of a read event (op=r) generated for a snapshot record. The after field contains the values for the row at the time of the snapshot:

{
  "before": null,
  "after": {
     "cflt.SAMPLE.EMPLOYEES.Value": {
        "ID": 1,
        "FIRST_NAME": "Jack",
        "LAST_NAME": {
           "string": "Reacher"
         },
      "HIRE_DATE": {
        "long": 1704088800000000
       }
      }
   }
  ...
}

A create event is generated when a record is created in the source table. Here’s an example of a create event (op=c) generated when a row is inserted. The after field contains the values inserted into the columns of the row:

{
  "before": null,
  "after": {
     "cflt.SAMPLE.EMPLOYEES.Value": {
        "ID": 2,
        "FIRST_NAME": "Ethan",
        "LAST_NAME": {
           "string": "Hunt"
         },
      "HIRE_DATE": {
        "long": 1711963800000000
       }
      }
   }
  ...
}

An update event is generated when an existing record is updated in the source table.

Here’s an example of an update event (op=u) generated when a row is updated. The before field contains the previous state of the row before the update, and the after field contains the updated state of the row:

{
  "before": {
     "cflt.SAMPLE.EMPLOYEES.Value": {
        "ID": 2,
        "FIRST_NAME": "Ethan",
        "LAST_NAME": {
           "string": "Hunt"
         },
      "HIRE_DATE": {
        "long": 1711963800000000

      }
   }
},
{
  "after": {
     "cflt.SAMPLE.EMPLOYEES.Value": {
        "ID": 2,
        "FIRST_NAME": "Ethan",
        "LAST_NAME": {
           "string": "Blake"
         },
      "HIRE_DATE": {
        "long": 1711963800000000
       }
      }
   }
   ...
}

The connector emits three events when the primary key column(s) of an existing row is updated:

  • A DELETE event with the old key for the row.

  • A tombstone event with the old key for the row.

  • An INSERT event that provides the new key for the row.

    Note

    For tables that include large object (LOB) columns, the after field of the INSERT event will contain the unavailable value placeholder for any LOB columns that were not modified by the UPDATE statement.

A delete event is generated when a record is deleted from the source table. Here’s an example of a delete event (op=d) generated when a row is deleted. The before field contains the previous state of the row before it was deleted:

{
  "before": {
     "cflt.SAMPLE.EMPLOYEES.Value": {
        "ID": 1,
        "FIRST_NAME": {
           "string": "Jack"
        },
        "LAST_NAME": {
           "string": "Reacher"
        },
        "HIRE_DATE": {
           "long": 1704088800000000
        }
      }
   },
   "after": null,
   ...
}

By default, the connector follows a delete event with a tombstone event (compaction) that has the same key and a null value. You can modify this behavior by setting the tombstones.on.delete connector configuration property.

A truncate event is generated when a source table is truncated.

The example below shows a truncate event (op=t) generated when a table is truncated. The message key is null, and both the before and after fields are set to null:

{
  "before": null,
  "after": null,
  ...
}

By default, the connector does not capture truncate events. You can change this using the skipped.operations configuration property.

Note

For change event topics with multiple partitions, there is no ordering guarantee for the change events (create, update, so on), or truncate events related to a table. Ordering is guaranteed only for change event topics that use a single partition.