Examples for Oracle XStream CDC Source Connector for Confluent Cloud¶
The following sections provide examples for configuring and managing both the Oracle XStream CDC Source connector and the Oracle XStream components on the database.
Configure the table capture set¶
The connector uses two configuration properties to identify which tables to capture from the database:
table.include.list
: This configuration specifies a comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes should be captured.table.exclude.list
: This configuration specifies a comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes should not be captured.
In addition to the connector configuration properties, the tables to be captured from the database must be specified in the rule sets of the capture process and outbound server to which the connector is attached.
Capture multiple tables¶
The following example demonstrates how to configure an outbound server and its capture process to capture changes from the ORDERS
and ORDER_LINE
tables in the OE
schema of the ORCLPDB1
database.
You can specify the tables and schemas in the following two ways:
In this method, each table is specified as an element of the associative array.
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := 'oe.orders';
tables(2) := 'oe.order_line';
schemas(1) := NULL;
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'xout',
source_container_name => 'ORCLPDB1',
table_names => tables,
schema_names => schemas);
END;
Each table should be specified as [<schema-name>.]table-name
. If the schema name is not specified, the current user schema is the default.
In this method, the tables are specified in a comma-delimited format.
DECLARE
tables VARCHAR2(4000) := 'oe.orders,oe.order_line';
BEGIN
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'xout',
source_container_name => 'ORCLPDB1',
table_names => tables);
END;
If both the table_names
and schema_names
parameters are specified in the CREATE_OUTBOUND
procedure call, the outbound server will stream DML and DDL changes for the specified tables and schemas.
For more information on using the DBMS_XSTREAM_ADM.CREATE_OUTBOUND
procedure, see CREATE_OUTBOUND Procedure.
Note
You can query the ALL_XSTREAM_RULES
view to see the rules for the capture process and outbound server.
For the connector’s table.include.list
configuration, the following value can be used: oe.orders,oe.order_line
.
Capture multiple tables and schemas¶
The following example demonstrates how to configure an outbound server and its capture process to capture changes from the ORDERS
and ORDER_LINE
tables in the OE
schema, as well as tables in the HR
schema of the ORCLPDB1
database.
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := 'oe.orders';
tables(2) := 'oe.order_line';
schemas(1) := 'hr';
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'xout',
source_container_name => 'ORCLPDB1',
table_names => tables,
schema_names => schemas);
END;
For the connector’s table.include.list
configuration, the following value can be used: oe\\.(orders|order_line),hr\\..+
.
Add tables to the capture set¶
The following example demonstrates how to add the NEW_ORDER
table in the OE
schema to the outbound server and its capture process.
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := 'oe.new_order';
schemas(1) := NULL;
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
server_name => 'xout',
source_container_name => 'ORCLPDB1',
table_names => tables,
schema_names => schemas,
add => true);
END;
If the add
parameter is set to true
, the ALTER_OUTBOUND
procedure adds the specified tables and schemas to the existing XStream Out configuration.
For more information on using the DBMS_XSTREAM_ADM.ALTER_OUTBOUND
procedure, see ALTER_OUTBOUND Procedure.
The connector’s table.include.list
configuration can be updated to: oe\\.(orders|order_line|new_order)
.
Remove tables from the capture set¶
The following example demonstrates how to remove the NEW_ORDER
table in the OE
schema from the outbound server and its capture process.
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := 'oe.new_order';
schemas(1) := NULL;
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
server_name => 'xout',
source_container_name => 'ORCLPDB1',
table_names => tables,
schema_names => schemas,
add => false);
END;
If the add
parameter is set to false, the ALTER_OUTBOUND
procedure removes the specified tables and schemas from the existing XStream Out configuration.
The connector’s table.include.list
configuration can be updated to: oe\\.(orders|order_line)
.
Add an outbound server to an existing capture process¶
This section demonstrates how to add an outbound server to an existing capture process, enabling the scaling of streaming performance through the use of additional connector instances. The configuration includes a single capture process with two outbound servers and connectors to distribute the processing load. The schema is based on the TPROC-C workload.
Step 1: Create a capture process, queue, and outbound server¶
The following example demonstrates how to create an outbound server, along with a capture process and queue.
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := 'tpcc.WAREHOUSE';
tables(2) := 'tpcc.DISTRICT';
tables(3) := 'tpcc.STOCK';
tables(4) := 'tpcc.CUSTOMER';
tables(5) := 'tpcc.ORDERS';
tables(6) := 'tpcc.ITEM';
tables(7) := 'tpcc.HISTORY';
tables(8) := 'tpcc.NEW_ORDER';
schemas(1) := NULL;
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'xout',
source_container_name => 'ORCLPDB1',
table_names => tables,
schema_names => schemas);
END;
This procedure creates an outbound server named xout
, a queue named Q$_XOUT_1
and a capture process named CAP$_XOUT_1
.
Note
You can query the ALL_XSTREAM_OUTBOUND
view to get the names of the capture process and queue associated with the outbound server.
Step 2: Add a second outbound server¶
To scale streaming performance, add a second outbound server using the same capture process and queue. The following script demonstrates how to do this:
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := 'tpcc.ORDER_LINE';
schemas(1) := NULL;
DBMS_XSTREAM_ADM.ADD_OUTBOUND(
server_name => 'xout2',
queue_name => 'Q$_XOUT_1',
capture_name => 'CAP$_XOUT_1',
source_container_name => 'ORCLPDB1',
table_names => tables,
schema_names => schemas);
END;
This procedure adds a second outbound server named xout2
, which handles the processing for the ORDER_LINE
table.
The ORDER_LINE
table experiences the highest rate of change (accounting for almost half of the workload) and is assigned to the second outbound server (xout2
). To distribute event processing across multiple connectors, provision two connector instances. Each connector should be configured to use a different outbound server using the database.out.server.name
configuration property.
Note
An XStream outbound server can support only one active client session at a time. This means multiple connectors cannot be attached to the same outbound server simultaneously. As a result, separate outbound servers must be configured for each connector.
Create a capture process and an outbound server starting from a specific SCN¶
This section demonstrates how to set up a capture process and an outbound server to capture changes from the ORDERS
table in the OE
schema, starting from a specific SCN.
To perform the following steps, connect to the root container as the XStream administrator. The examples in this section use c##cfltadmin
as the XStream administrator.
Create queue and queue table¶
The following example creates a queue table named xs_queue_tbl
and a queue named xs_queue
if they do not already exist.
BEGIN
DBMS_XSTREAM_ADM.SET_UP_QUEUE(
queue_table => 'c##cfltadmin.xs_queue_tbl',
queue_name => 'c##cfltadmin.xs_queue');
END;
For more information on using the DBMS_XSTREAM_ADM.SET_UP_QUEUE
procedure, see SET_UP_QUEUE Procedure in the Oracle documentation.
Create a capture process¶
The following example creates a capture process named xs_capture
that starts capturing changes from the specified start_scn
value. It enqueues the captured changes into the queue named xs_queue
.
BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'c##cfltadmin.xs_queue',
capture_name => 'xs_capture',
capture_user => 'c##cfltadmin',
first_scn => <first_scn>,
start_scn => <start_scn>,
source_database => 'ORCLPDB1',
source_root_name => 'ORCLCDB',
capture_class => 'XStream');
END;
The DBMS_CAPTURE_ADM.BUILD procedure must have been run at least once before on the source database, and the specified first_scn
must correspond to the SCN value of a previous build that is still available in the redo log.
To determine whether the DBMS_CAPTURE_ADM.BUILD
procedure has been run on the source database, you can query the FIRST_CHANGE#
column from the V$ARCHIVED_LOG
dynamic performance view where the DICTIONARY_BEGIN
column is YES
. Any returned FIRST_CHANGE#
value can be used as the first_scn
if the redo log containing that SCN value is still available.
For more information on using the DBMS_CAPTURE_ADM.CREATE_CAPTURE
procedure, see CREATE_CAPTURE Procedure in the Oracle documentation.
Create capture process rules¶
The following example creates capture process rules (and a rule set if it does not already exist) to capture DML and DDL changes from the ORDERS
table in the OE
schema.
BEGIN
DBMS_XSTREAM_ADM.ADD_TABLE_RULES(
table_name => 'oe.orders',
streams_type => 'capture',
streams_name => 'xs_capture',
queue_name => 'c##cfltadmin.xs_queue',
include_dml => TRUE,
include_ddl => TRUE,
source_database => 'ORCLPDB1',
source_root_name => 'ORCLCDB',
source_container_name => 'ORCLPDB1');
END;
For more information on using the DBMS_XSTREAM_ADM.ADD_TABLE_RULES
procedure, see ADD_TABLE_RULES Procedure in the Oracle documentation.
Add an outbound server¶
The following example creates an outbound server named xs_outbound
which dequeues LCRs from the queue named xs_queue
.
BEGIN
DBMS_XSTREAM_ADM.ADD_OUTBOUND(
server_name => 'xs_outbound',
queue_name => 'c##cfltadmin.xs_queue',
capture_name => 'xs_capture',
include_dml => FALSE,
include_ddl => FALSE,
source_database => 'ORCLPDB1',
source_root_name => 'ORCLCDB',
source_container_name => 'ORCLPDB1');
END;
For more information on using the DBMS_XSTREAM_ADM.ADD_OUTBOUND
procedure, see ADD_OUTBOUND Procedure in the Oracle documentation.
Create outbound server rules¶
The following example creates outbound server rules (and a rule set if it does not already exist) to stream LCRs from the ORDERS
table in the OE
schema to the connector.
BEGIN
DBMS_XSTREAM_ADM.ADD_TABLE_RULES(
table_name => 'oe.orders',
streams_type => 'apply',
streams_name => 'xs_outbound',
queue_name => 'c##cfltadmin.xs_queue',
include_dml => TRUE,
include_ddl => TRUE,
source_database => 'ORCLPDB1',
source_root_name => 'ORCLCDB',
source_container_name => 'ORCLPDB1');
END;
For more information on using the DBMS_XSTREAM_ADM.ADD_TABLE_RULES
procedure, see ADD_TABLE_RULES Procedure in the Oracle documentation.
Change the connect user¶
The following example updates the connect user to c##cfltuser
. The connector must connect to the database as the connect user to attach to the outbound server and receive changes.
BEGIN
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
server_name => 'xs_outbound',
connect_user => 'c##cfltuser');
END;
For more information on using the DBMS_XSTREAM_ADM.ALTER_OUTBOUND
procedure, see ALTER_OUTBOUND Procedure in the Oracle documentation.
Start the capture process¶
The following example starts a capture process named xs_capture
.
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'xs_capture');
END;
For more information on using the DBMS_CAPTURE_ADM.START_CAPTURE
procedure,
see START_CAPTURE Procedure in the Oracle documentation.
Start the outbound server¶
The following example starts an outbound server named xs_outbound
.
BEGIN
DBMS_XSTREAM_ADM.START_OUTBOUND(
server_name => 'xs_outbound');
END;
For more information on using the DBMS_CAPTURE_ADM.START_OUTBOUND
procedure,
see START_OUTBOUND Procedure in the Oracle documentation.
Note
When a connector connects to an outbound server, it automatically starts both the outbound server and its associated capture process if either is disabled.
(Optional) Remove the Oracle XStream components¶
The following example demonstrates how to remove the Oracle XStream components created in this section when they are no longer needed.
Step 1: Stop the outbound server¶
BEGIN
DBMS_XSTREAM_ADM.STOP_OUTBOUND(
server_name => 'xs_outbound');
END;
For more information on using the DBMS_XSTREAM_ADM.STOP_OUTBOUND
procedure,
see STOP_OUTBOUND Procedure in the Oracle documentation.
Step 2: Drop the outbound server¶
BEGIN
DBMS_XSTREAM_ADM.DROP_OUTBOUND(
server_name => 'xs_outbound');
END;
For more information on using the DBMS_XSTREAM_ADM.DROP_OUTBOUND
procedure, see DROP_OUTBOUND Procedure in the Oracle documentation.
Step 3: Stop the capture process¶
BEGIN
DBMS_CAPTURE_ADM.STOP_CAPTURE(
capture_name => 'xs_capture');
END;
For more information on using the DBMS_CAPTURE_ADM.STOP_CAPTURE
procedure,
see STOP_CAPTURE Procedure in the Oracle documentation.
Step 4: Drop the capture process¶
BEGIN
DBMS_CAPTURE_ADM.DROP_CAPTURE(
capture_name => 'xs_capture',
drop_unused_rule_sets => true);
END;
For more information on using the DBMS_CAPTURE_ADM.DROP_CAPTURE
procedure, see DROP_CAPTURE Procedure in the Oracle documentation.
Step 5: Drop the queue and queue table¶
BEGIN
DBMS_XSTREAM_ADM.REMOVE_QUEUE(
queue_name => 'xs_queue',
drop_unused_queue_table => true);
END;
For more information on using the DBMS_XSTREAM_ADM.REMOVE_QUEUE
procedure, see REMOVE_QUEUE Procedure in the Oracle documentation.