Examples for Oracle XStream CDC Source Connector for Confluent Cloud

The following sections provide examples for configuring and managing both the Oracle XStream CDC Source connector and the Oracle XStream components on the database.

Configure the table capture set

The connector uses two configuration properties to identify which tables to capture from the database:

  • table.include.list: This configuration specifies a comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes should be captured.
  • table.exclude.list: This configuration specifies a comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes should not be captured.

In addition to the connector configuration properties, the tables to be captured from the database must be specified in the rule sets of the capture process and outbound server to which the connector is attached.

Capture multiple tables

The following example demonstrates how to configure an outbound server and its capture process to capture changes from the ORDERS and ORDER_LINE tables in the OE schema of the ORCLPDB1 database.

You can specify the tables and schemas in the following two ways:

In this method, each table is specified as an element of the associative array.

DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
  tables(1)  := 'oe.orders';
  tables(2)  := 'oe.order_line';
  schemas(1) := NULL;
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name           =>  'xout',
    source_container_name =>  'ORCLPDB1',
    table_names           =>  tables,
    schema_names          =>  schemas);
END;

Each table should be specified as [<schema-name>.]table-name. If the schema name is not specified, the current user schema is the default.

If both the table_names and schema_names parameters are specified in the CREATE_OUTBOUND procedure call, the outbound server will stream DML and DDL changes for the specified tables and schemas.

For more information on using the DBMS_XSTREAM_ADM.CREATE_OUTBOUND procedure, see CREATE_OUTBOUND Procedure.

Note

You can query the ALL_XSTREAM_RULES view to see the rules for the capture process and outbound server.

For the connector’s table.include.list configuration, the following value can be used: oe.orders,oe.order_line.

Capture multiple tables and schemas

The following example demonstrates how to configure an outbound server and its capture process to capture changes from the ORDERS and ORDER_LINE tables in the OE schema, as well as tables in the HR schema of the ORCLPDB1 database.

DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
  tables(1)  := 'oe.orders';
  tables(2)  := 'oe.order_line';
  schemas(1) := 'hr';
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name           =>  'xout',
    source_container_name =>  'ORCLPDB1',
    table_names           =>  tables,
    schema_names          =>  schemas);
END;

For the connector’s table.include.list configuration, the following value can be used: oe\\.(orders|order_line),hr\\..+.

Add tables to the capture set

The following example demonstrates how to add the NEW_ORDER table in the OE schema to the outbound server and its capture process.

DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
  tables(1)  := 'oe.new_order';
  schemas(1) := NULL;
  DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
    server_name           =>  'xout',
    source_container_name =>  'ORCLPDB1',
    table_names           =>  tables,
    schema_names          =>  schemas,
    add                   =>  true);
END;

If the add parameter is set to true, the ALTER_OUTBOUND procedure adds the specified tables and schemas to the existing XStream Out configuration.

For more information on using the DBMS_XSTREAM_ADM.ALTER_OUTBOUND procedure, see ALTER_OUTBOUND Procedure.

The connector’s table.include.list configuration can be updated to: oe\\.(orders|order_line|new_order).

Remove tables from the capture set

The following example demonstrates how to remove the NEW_ORDER table in the OE schema from the outbound server and its capture process.

DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
  tables(1)  := 'oe.new_order';
  schemas(1) := NULL;
  DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
    server_name           =>  'xout',
    source_container_name =>  'ORCLPDB1',
    table_names           =>  tables,
    schema_names          =>  schemas,
    add                   =>  false);
END;

If the add parameter is set to false, the ALTER_OUTBOUND procedure removes the specified tables and schemas from the existing XStream Out configuration.

The connector’s table.include.list configuration can be updated to: oe\\.(orders|order_line).

Add an outbound server to an existing capture process

This section demonstrates how to add an outbound server to an existing capture process, enabling the scaling of streaming performance through the use of additional connector instances. The configuration includes a single capture process with two outbound servers and connectors to distribute the processing load. The schema is based on the TPROC-C workload.

Step 1: Create a capture process, queue, and outbound server

The following example demonstrates how to create an outbound server, along with a capture process and queue.

DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
  tables(1)  := 'tpcc.WAREHOUSE';
  tables(2)  := 'tpcc.DISTRICT';
  tables(3)  := 'tpcc.STOCK';
  tables(4)  := 'tpcc.CUSTOMER';
  tables(5)  := 'tpcc.ORDERS';
  tables(6)  := 'tpcc.ITEM';
  tables(7)  := 'tpcc.HISTORY';
  tables(8)  := 'tpcc.NEW_ORDER';
  schemas(1) := NULL;
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name           =>  'xout',
    source_container_name =>  'ORCLPDB1',
    table_names           =>  tables,
    schema_names          =>  schemas);
END;

This procedure creates an outbound server named xout, a queue named Q$_XOUT_1 and a capture process named CAP$_XOUT_1.

Note

You can query the ALL_XSTREAM_OUTBOUND view to get the names of the capture process and queue associated with the outbound server.

Step 2: Add a second outbound server

To scale streaming performance, add a second outbound server using the same capture process and queue. The following script demonstrates how to do this:

DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
  tables(1)  := 'tpcc.ORDER_LINE';
  schemas(1) := NULL;
  DBMS_XSTREAM_ADM.ADD_OUTBOUND(
    server_name           =>  'xout2',
    queue_name            =>  'Q$_XOUT_1',
    capture_name          =>  'CAP$_XOUT_1',
    source_container_name =>  'ORCLPDB1',
    table_names           =>  tables,
    schema_names          =>  schemas);
END;

This procedure adds a second outbound server named xout2, which handles the processing for the ORDER_LINE table.

The ORDER_LINE table experiences the highest rate of change (accounting for almost half of the workload) and is assigned to the second outbound server (xout2). To distribute event processing across multiple connectors, provision two connector instances. Each connector should be configured to use a different outbound server using the database.out.server.name configuration property.

Note

An XStream outbound server can support only one active client session at a time. This means multiple connectors cannot be attached to the same outbound server simultaneously. As a result, separate outbound servers must be configured for each connector.

Create a capture process and an outbound server starting from a specific SCN

This section demonstrates how to set up a capture process and an outbound server to capture changes from the ORDERS table in the OE schema, starting from a specific SCN.

To perform the following steps, connect to the root container as the XStream administrator. The examples in this section use c##cfltadmin as the XStream administrator.

Create queue and queue table

The following example creates a queue table named xs_queue_tbl and a queue named xs_queue if they do not already exist.

BEGIN
  DBMS_XSTREAM_ADM.SET_UP_QUEUE(
    queue_table => 'c##cfltadmin.xs_queue_tbl',
    queue_name  => 'c##cfltadmin.xs_queue');
END;

For more information on using the DBMS_XSTREAM_ADM.SET_UP_QUEUE procedure, see SET_UP_QUEUE Procedure in the Oracle documentation.

Create a capture process

The following example creates a capture process named xs_capture that starts capturing changes from the specified start_scn value. It enqueues the captured changes into the queue named xs_queue.

BEGIN
  DBMS_CAPTURE_ADM.CREATE_CAPTURE(
    queue_name       => 'c##cfltadmin.xs_queue',
    capture_name     => 'xs_capture',
    capture_user     => 'c##cfltadmin',
    first_scn        => <first_scn>,
    start_scn        => <start_scn>,
    source_database  => 'ORCLPDB1',
    source_root_name => 'ORCLCDB',
    capture_class    => 'XStream');
END;

The DBMS_CAPTURE_ADM.BUILD procedure must have been run at least once before on the source database, and the specified first_scn must correspond to the SCN value of a previous build that is still available in the redo log.

To determine whether the DBMS_CAPTURE_ADM.BUILD procedure has been run on the source database, you can query the FIRST_CHANGE# column from the V$ARCHIVED_LOG dynamic performance view where the DICTIONARY_BEGIN column is YES. Any returned FIRST_CHANGE# value can be used as the first_scn if the redo log containing that SCN value is still available.

For more information on using the DBMS_CAPTURE_ADM.CREATE_CAPTURE procedure, see CREATE_CAPTURE Procedure in the Oracle documentation.

Create capture process rules

The following example creates capture process rules (and a rule set if it does not already exist) to capture DML and DDL changes from the ORDERS table in the OE schema.

BEGIN
  DBMS_XSTREAM_ADM.ADD_TABLE_RULES(
    table_name            => 'oe.orders',
    streams_type          => 'capture',
    streams_name          => 'xs_capture',
    queue_name            => 'c##cfltadmin.xs_queue',
    include_dml           => TRUE,
    include_ddl           => TRUE,
    source_database       => 'ORCLPDB1',
    source_root_name      => 'ORCLCDB',
    source_container_name => 'ORCLPDB1');
END;

For more information on using the DBMS_XSTREAM_ADM.ADD_TABLE_RULES procedure, see ADD_TABLE_RULES Procedure in the Oracle documentation.

Add an outbound server

The following example creates an outbound server named xs_outbound which dequeues LCRs from the queue named xs_queue.

BEGIN
  DBMS_XSTREAM_ADM.ADD_OUTBOUND(
    server_name           => 'xs_outbound',
    queue_name            => 'c##cfltadmin.xs_queue',
    capture_name          => 'xs_capture',
    include_dml           => FALSE,
    include_ddl           => FALSE,
    source_database       => 'ORCLPDB1',
    source_root_name      => 'ORCLCDB',
    source_container_name => 'ORCLPDB1');
END;

For more information on using the DBMS_XSTREAM_ADM.ADD_OUTBOUND procedure, see ADD_OUTBOUND Procedure in the Oracle documentation.

Create outbound server rules

The following example creates outbound server rules (and a rule set if it does not already exist) to stream LCRs from the ORDERS table in the OE schema to the connector.

BEGIN
  DBMS_XSTREAM_ADM.ADD_TABLE_RULES(
    table_name            => 'oe.orders',
    streams_type          => 'apply',
    streams_name          => 'xs_outbound',
    queue_name            => 'c##cfltadmin.xs_queue',
    include_dml           => TRUE,
    include_ddl           => TRUE,
    source_database       => 'ORCLPDB1',
    source_root_name      => 'ORCLCDB',
    source_container_name => 'ORCLPDB1');
END;

For more information on using the DBMS_XSTREAM_ADM.ADD_TABLE_RULES procedure, see ADD_TABLE_RULES Procedure in the Oracle documentation.

Change the connect user

The following example updates the connect user to c##cfltuser. The connector must connect to the database as the connect user to attach to the outbound server and receive changes.

BEGIN
  DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
    server_name  => 'xs_outbound',
    connect_user => 'c##cfltuser');
END;

For more information on using the DBMS_XSTREAM_ADM.ALTER_OUTBOUND procedure, see ALTER_OUTBOUND Procedure in the Oracle documentation.

Start the capture process

The following example starts a capture process named xs_capture.

BEGIN
  DBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name => 'xs_capture');
END;

For more information on using the DBMS_CAPTURE_ADM.START_CAPTURE procedure, see START_CAPTURE Procedure in the Oracle documentation.

Start the outbound server

The following example starts an outbound server named xs_outbound.

BEGIN
  DBMS_XSTREAM_ADM.START_OUTBOUND(
    server_name => 'xs_outbound');
END;

For more information on using the DBMS_CAPTURE_ADM.START_OUTBOUND procedure, see START_OUTBOUND Procedure in the Oracle documentation.

Note

When a connector connects to an outbound server, it automatically starts both the outbound server and its associated capture process if either is disabled.

(Optional) Remove the Oracle XStream components

The following example demonstrates how to remove the Oracle XStream components created in this section when they are no longer needed.

Step 1: Stop the outbound server

BEGIN
  DBMS_XSTREAM_ADM.STOP_OUTBOUND(
    server_name => 'xs_outbound');
END;

For more information on using the DBMS_XSTREAM_ADM.STOP_OUTBOUND procedure, see STOP_OUTBOUND Procedure in the Oracle documentation.

Step 2: Drop the outbound server

BEGIN
  DBMS_XSTREAM_ADM.DROP_OUTBOUND(
    server_name => 'xs_outbound');
END;

For more information on using the DBMS_XSTREAM_ADM.DROP_OUTBOUND procedure, see DROP_OUTBOUND Procedure in the Oracle documentation.

Step 3: Stop the capture process

BEGIN
  DBMS_CAPTURE_ADM.STOP_CAPTURE(
    capture_name => 'xs_capture');
END;

For more information on using the DBMS_CAPTURE_ADM.STOP_CAPTURE procedure, see STOP_CAPTURE Procedure in the Oracle documentation.

Step 4: Drop the capture process

BEGIN
  DBMS_CAPTURE_ADM.DROP_CAPTURE(
    capture_name          => 'xs_capture',
    drop_unused_rule_sets => true);
END;

For more information on using the DBMS_CAPTURE_ADM.DROP_CAPTURE procedure, see DROP_CAPTURE Procedure in the Oracle documentation.

Step 5: Drop the queue and queue table

BEGIN
  DBMS_XSTREAM_ADM.REMOVE_QUEUE(
    queue_name               => 'xs_queue',
    drop_unused_queue_table  => true);
END;

For more information on using the DBMS_XSTREAM_ADM.REMOVE_QUEUE procedure, see REMOVE_QUEUE Procedure in the Oracle documentation.