Signals and Actions for Oracle XStream CDC Source Connector for Confluent Platform

The connector offers a signaling mechanism to trigger specific actions, such as performing an ad-hoc snapshot of a table.

You can send signals through one or more channels, such as a database table or a Kafka topic. To configure which channels are enabled, use the signal.enabled.channels configuration property.

Channels

You must explicitly configure signaling for each connector you want to use it with. The required configuration varies depending on the signaling channel you use.

Source signaling channel

To set up a source signaling channel, create a signaling table with a specific schema in the source database. To send a signal, insert a record into this table. The connector enables this channel by default.

Each connector instance must have its own unique signaling table.

Create a signaling table

In the source database, create a signaling table to send signals to the connector. The table must follow the structure below, with columns in the exact order shown:

Column

Type

Notes

id

VARCHAR2

A unique identifier for the signal instance, typically a UUID string.

type

VARCHAR2

The type of signal being sent.

data

VARCHAR2

JSON-formatted parameters that provide additional details for the signal action.

The following example creates a signaling table named cflt_signals that follows the required structure:

CREATE TABLE cflt_signals (
  id VARCHAR(42) PRIMARY KEY,
  type VARCHAR(32) NOT NULL,
  data VARCHAR(2048)
);

Additionally, ensure the following:

  1. Enable supplemental logging for this table. For detailed instruction, see Configure supplemental logging.

  2. The database user configured on the connector has the necessary privileges on this table. For detailed instructions, see Configure database users.

Add the signaling table to Oracle XStream rules

To ensure that signals added to the signaling table are captured and sent to the connector during streaming, you must add the signaling table to the rule set of both the capture process and the outbound server.

For more information on adding tables to the rule sets of the capture process and outbound server, see Configure the table capture set.

Add the signaling table to connector configuration

Set the signal.data.collection configuration property to the fully-qualified name of the signaling table, using the format: <databaseName>.<schemaName>.<tableName>.

Note

The case of the object names in the fully-qualified name must exactly match how they are stored in the Oracle database. Unquoted identifiers are case-insensitive and are treated as uppercase by default, whereas quoted identifiers are case-sensitive.

For more information about the signal.data.collection property, see Connector configuration properties.

Kafka signaling channel

To configure a Kafka signaling channel, add kafka to the signal.enabled.channels configuration property and specify the name of the Kafka topic that will receive signals using the signal.kafka.topic property.

Each connector instance must have its own unique signaling topic. The topic must have a single partition to ensure signal ordering is preserved. Once you enable this channel, the connector creates a Kafka consumer to read signals from the configured topic.

Kafka signal requirements

Each Kafka message used as a signal must meet the following requirements:

  • The message key must match the value of the topic.prefix configuration property.

  • The message value must be a JSON object containing the type and data fields.

Use the StringConverter for both the key and value when producing records to the signaling topic.

Configure the signal topic consumer

You can configure the Kafka consumer that the connector uses to read from the signal topic by using pass-through configuration properties with the prefix signal.consumer. The connector removes this prefix before passing the properties to the consumer. For example, to set the signal consumer’s security.protocol property to SSL, use signal.consumer.security.protocol=SSL.

Actions

You can use signaling to instruct the connector to perform specific actions. This section describes the supported actions.

Ad-hoc blocking snapshot

An ad-hoc blocking snapshot captures a snapshot of specific tables after the connector has already started streaming changes. This action is useful when you update the connector configuration to include new tables or need to backfill missing data into Kafka topics.

To trigger an ad-hoc blocking snapshot, send a signal with type set to execute-snapshot and data.type set to blocking. When the connector receives this signal, it temporarily pauses streaming, performs the snapshot using the same process as the initial snapshot, and then resumes streaming once the snapshot is complete.

Note

A short delay may occur between when the signal is sent and when streaming pauses for the snapshot to begin. As a result, after the snapshot completes, the connector might emit some change events that duplicate those captured in the snapshot.

Within the signal, you can specify a subset of tables from the source database to include in the snapshot. For each table, you can also define additional conditions to control which records are captured. All tables specified in the signal must be part of the connector’s table inclusion list.

The following table describes the structure of the data element in an ad-hoc blocking snapshot signal:

Field

Default

Notes

type

n/a

Specifies the type of snapshot to run. Currently, only blocking snapshots are supported.

data-collections

n/a

An array of regular expressions matching the fully-qualified names of tables to snapshot, using the format: <databaseName>.<schemaName>.<tableName>.

Note

The case of the object names must exactly match how they are stored in the Oracle database. Unquoted identifiers are case-insensitive and are treated as uppercase by default, whereas quoted identifiers are case-sensitive.

additional-conditions

n/a

(Optional) An array that defines additional conditions the connector uses to determine which records to include in the snapshot. Each additional condition is an object that specifies the filtering criteria.

You can set the following parameters for each additional condition:

  • data-collection: The fully-qualified name of the table the filter applies to. The case must exactly match how they are stored in the Oracle database, following the same case sensitivity rules as above.

  • filter: A SELECT statement that determines the subset of table rows to include in the snapshot. The case of schema and table names must exactly match how they are stored in the Oracle database, following the same case sensitivity rules as above. Quoted identifiers must be enclosed in double quotes.

When an ad-hoc snapshot is initiated for a table that already has an associated Kafka topic, the connector appends the snapshot data to that topic. If the topic was deleted or does not exist, the connector can automatically create a new one if automatic topic creation is enabled.

Change events generated during an ad-hoc blocking snapshot have the snapshot field in the source block set to a value other than false, and the op field set to r, indicating a READ operation.

Using a source channel

To initiate an ad-hoc blocking snapshot using the source channel, insert a signal record into the signaling table on the source database.

The following example shows how to send a signal to initiate an ad-hoc blocking snapshot for the tables ORDERS and ORDER_LINE in the OE schema of the ORCLPDB1 database:

INSERT INTO cflt_signals (id, type, data)
VALUES (
  '6af62216-7086-439b-b915-51825f9e49f7',
  'execute-snapshot',
  '{
      "type": "blocking",
      "data-collections": ["ORCLPDB1.OE.ORDERS", "ORCLPDB1.OE.ORDER_LINE"]
   }'
);

If you want the snapshot to include only a subset of data from a table, you can add an additional-conditions parameter to the signal.

The following example initiates an ad-hoc blocking snapshot that captures orders from the ORDERS table for a specific customer with ID 42, and order lines from the ORDER_LINE table where the quantity ordered is greater than 5:

INSERT INTO cflt_signals (id, type, data)
VALUES (
  '8a474adc-caca-4cf4-85d7-2b42e056ef5c',
  'execute-snapshot',
  '{
      "type": "blocking",
      "data-collections": ["ORCLPDB1.OE.ORDERS", "ORCLPDB1.OE.ORDER_LINE"],
      "additional-conditions": [
        {
          "data-collection": "ORCLPDB1.OE.ORDERS",
          "filter": "SELECT * FROM OE.ORDERS WHERE o_c_id = 42"
        },
        {
          "data-collection": "ORCLPDB1.OR.ORDER_LINE",
          "filter": "SELECT * FROM OE.ORDER_LINE WHERE ol_quantity > 5"
        }
      ]
    }'
);

Using a Kafka channel

To initiate an ad-hoc blocking snapshot using the Kafka channel, send a signal message to the signaling topic configured on the connector.

The key of the Kafka message must match the value of the topic.prefix connector configuration property. The value of the message must be a JSON object that includes both type and data fields.

The following example shows a Kafka signal message value that initiates an ad-hoc blocking snapshot for the tables ORDERS and ORDER_LINE in the OE schema of the ORCLPDB1 database.

{
  "type":"execute-snapshot",
  "data": {
    "type": "BLOCKING",
    "data-collections": ["ORCLPDB1.OE.ORDERS", "ORCLPDB1.OE.ORDER_LINE"]
   }
}

To snapshot only a subset of data from a table, add an additional-conditions parameter to the signal. This allows you to define specific filters for each table using SQL SELECT statements.

The following example initiates an ad-hoc blocking snapshot that captures orders from the ORDERS table for a specific customer with ID 42, and order lines from the ORDER_LINE table where the quantity ordered is greater than 5:

{
  "type":"execute-snapshot",
  "data": {
    "type": "BLOCKING",
    "data-collections": ["ORCLPDB1.OE.ORDERS", "ORCLPDB1.OE.ORDER_LINE"],
    "additional-conditions": [
     {
       "data-collection": "ORCLPDB1.OE.ORDERS",
       "filter": "SELECT * FROM OE.ORDERS WHERE o_c_id = 42"
     },
     {
       "data-collection": "ORCLPDB1.OE.ORDER_LINE",
       "filter": "SELECT * FROM OE.ORDER_LINE WHERE ol_quantity > 5"
     }
    ]
  }
}