Signals and Actions for Oracle XStream CDC Source Connector for Confluent Platform
The connector offers a signaling mechanism to trigger specific actions, such as performing an ad-hoc snapshot of a table.
You can send signals through one or more channels, such as a database table or a Kafka
topic. To configure which channels are enabled, use the signal.enabled.channels configuration property.
Channels
You must explicitly configure signaling for each connector you want to use it with. The required configuration varies depending on the signaling channel you use.
Source signaling channel
To set up a source signaling channel, create a signaling table with a specific schema in the source database. To send a signal, insert a record into this table. The connector enables this channel by default.
Each connector instance must have its own unique signaling table.
Create a signaling table
In the source database, create a signaling table to send signals to the connector. The table must follow the structure below, with columns in the exact order shown:
| Column | Type | Notes | 
|---|---|---|
| id | VARCHAR2 | A unique identifier for the signal instance, typically a UUID string. | 
| type | VARCHAR2 | The type of signal being sent. | 
| data | VARCHAR2 | JSON-formatted parameters that provide additional details for the signal action. | 
The following example creates a signaling table named cflt_signals that follows the required structure:
CREATE TABLE cflt_signals (
  id VARCHAR(42) PRIMARY KEY,
  type VARCHAR(32) NOT NULL,
  data VARCHAR(2048)
);
Additionally, ensure the following:
- Enable supplemental logging for this table. For detailed instruction, see Configure supplemental logging. 
- The database user configured on the connector has the necessary privileges on this table. For detailed instructions, see Configure database users. 
Add the signaling table to Oracle XStream rules
To ensure that signals added to the signaling table are captured and sent to the connector during streaming, you must add the signaling table to the rule set of both the capture process and the outbound server.
For more information on adding tables to the rule sets of the capture process and outbound server, see Configure the table capture set.
Add the signaling table to connector configuration
Set the signal.data.collection configuration property to the fully-qualified name of
the signaling table, using the format: <databaseName>.<schemaName>.<tableName>.
Note
The case of the object names in the fully-qualified name must exactly match how they are stored in the Oracle database. Unquoted identifiers are case-insensitive and are treated as uppercase by default, whereas quoted identifiers are case-sensitive.
For more information about the signal.data.collection property,
see Connector configuration properties.
Kafka signaling channel
To configure a Kafka signaling channel, add kafka to the signal.enabled.channels
configuration property and specify the name of the Kafka topic that will receive
signals using the signal.kafka.topic property.
Each connector instance must have its own unique signaling topic. The topic must have a single partition to ensure signal ordering is preserved. Once you enable this channel, the connector creates a Kafka consumer to read signals from the configured topic.
Kafka signal requirements
Each Kafka message used as a signal must meet the following requirements:
- The message key must match the value of the - topic.prefixconfiguration property.
- The message value must be a JSON object containing the - typeand- datafields.
Use the StringConverter for both the key and value when producing records to the signaling topic.
Configure the signal topic consumer
You can configure the Kafka consumer that the connector uses to read from the signal topic by
using pass-through configuration properties with the prefix signal.consumer. The connector
removes this prefix before passing the properties to the consumer. For example, to set the
signal consumer’s security.protocol property to SSL, use signal.consumer.security.protocol=SSL.
Actions
You can use signaling to instruct the connector to perform specific actions. This section describes the supported actions.
Ad-hoc blocking snapshot
An ad-hoc blocking snapshot captures a snapshot of specific tables after the connector has already started streaming changes. This action is useful when you update the connector configuration to include new tables or need to backfill missing data into Kafka topics.
To trigger an ad-hoc blocking snapshot, send a signal with type set to execute-snapshot
and data.type set to blocking. When the connector receives this signal, it temporarily
pauses streaming, performs the snapshot using the same process as the initial snapshot,
and then resumes streaming once the snapshot is complete.
Note
A short delay may occur between when the signal is sent and when streaming pauses for the snapshot to begin. As a result, after the snapshot completes, the connector might emit some change events that duplicate those captured in the snapshot.
Within the signal, you can specify a subset of tables from the source database to include in the snapshot. For each table, you can also define additional conditions to control which records are captured. All tables specified in the signal must be part of the connector’s table inclusion list.
The following table describes the structure of the data element in an ad-hoc blocking snapshot signal:
| Field | Default | Notes | 
|---|---|---|
| type | n/a | Specifies the type of snapshot to run. Currently, only  | 
| data-collections | n/a | An array of regular expressions matching the fully-qualified names of tables to snapshot,
using the format:  Note The case of the object names must exactly match how they are stored in the Oracle database. Unquoted identifiers are case-insensitive and are treated as uppercase by default, whereas quoted identifiers are case-sensitive. | 
| additional-conditions | n/a | (Optional) An array that defines additional conditions the connector uses to determine which records to include in the snapshot. Each additional condition is an object that specifies the filtering criteria. You can set the following parameters for each additional condition: 
 | 
When an ad-hoc snapshot is initiated for a table that already has an associated Kafka topic, the connector appends the snapshot data to that topic. If the topic was deleted or does not exist, the connector can automatically create a new one if automatic topic creation is enabled.
Change events generated during an ad-hoc blocking snapshot have the snapshot field in the
source block set to a value other than false, and the op field set to r, indicating a READ operation.
Using a source channel
To initiate an ad-hoc blocking snapshot using the source channel, insert a signal record into the signaling table on the source database.
The following example shows how to send a signal to initiate an ad-hoc blocking snapshot
for the tables ORDERS and ORDER_LINE in the OE schema of the ORCLPDB1 database:
INSERT INTO cflt_signals (id, type, data)
VALUES (
  '6af62216-7086-439b-b915-51825f9e49f7',
  'execute-snapshot',
  '{
      "type": "blocking",
      "data-collections": ["ORCLPDB1.OE.ORDERS", "ORCLPDB1.OE.ORDER_LINE"]
   }'
);
If you want the snapshot to include only a subset of data from a table, you can add
an additional-conditions parameter to the signal.
The following example initiates an ad-hoc blocking snapshot that captures orders from
the ORDERS table for a specific customer with ID 42, and order lines from the ORDER_LINE
table where the quantity ordered is greater than 5:
INSERT INTO cflt_signals (id, type, data)
VALUES (
  '8a474adc-caca-4cf4-85d7-2b42e056ef5c',
  'execute-snapshot',
  '{
      "type": "blocking",
      "data-collections": ["ORCLPDB1.OE.ORDERS", "ORCLPDB1.OE.ORDER_LINE"],
      "additional-conditions": [
        {
          "data-collection": "ORCLPDB1.OE.ORDERS",
          "filter": "SELECT * FROM OE.ORDERS WHERE o_c_id = 42"
        },
        {
          "data-collection": "ORCLPDB1.OR.ORDER_LINE",
          "filter": "SELECT * FROM OE.ORDER_LINE WHERE ol_quantity > 5"
        }
      ]
    }'
);
Using a Kafka channel
To initiate an ad-hoc blocking snapshot using the Kafka channel, send a signal message to the signaling topic configured on the connector.
The key of the Kafka message must match the value of the topic.prefix connector configuration
property. The value of the message must be a JSON object that includes both type and data fields.
The following example shows a Kafka signal message value that initiates an ad-hoc blocking snapshot
for the tables ORDERS and ORDER_LINE in the OE schema of the ORCLPDB1 database.
{
  "type":"execute-snapshot",
  "data": {
    "type": "BLOCKING",
    "data-collections": ["ORCLPDB1.OE.ORDERS", "ORCLPDB1.OE.ORDER_LINE"]
   }
}
To snapshot only a subset of data from a table, add an additional-conditions parameter to
the signal. This allows you to define specific filters for each table using SQL SELECT statements.
The following example initiates an ad-hoc blocking snapshot that captures orders from the ORDERS
table for a specific customer with ID 42, and order lines from the ORDER_LINE table where the
quantity ordered is greater than 5:
{
  "type":"execute-snapshot",
  "data": {
    "type": "BLOCKING",
    "data-collections": ["ORCLPDB1.OE.ORDERS", "ORCLPDB1.OE.ORDER_LINE"],
    "additional-conditions": [
     {
       "data-collection": "ORCLPDB1.OE.ORDERS",
       "filter": "SELECT * FROM OE.ORDERS WHERE o_c_id = 42"
     },
     {
       "data-collection": "ORCLPDB1.OE.ORDER_LINE",
       "filter": "SELECT * FROM OE.ORDER_LINE WHERE ol_quantity > 5"
     }
    ]
  }
}