Showing posts with label Streams. Show all posts
Showing posts with label Streams. Show all posts

Monday, July 7, 2014

Change Data Capture using Streams

I found this dusty script from my old shelves which enables change data capture feature enabled in oracle database 11.2.0.4 enterprise edition running RHEL 6.2 64bit.

I remember this script is for keeping a product table changes in a CDC table for a java process to read the CDC$xxx table. This consumer process reads the changed records is developed by our talented java development group and is beyond the scope of this post as can be understood.

Here, i will share how i enabled my oracle database change data capture feature.

There is some preparation steps in order to start the change data capture. Most of them are like to streams preparations scripts which makes sense hence streams is the underlying option if this feature.



--prepare the publisher user

--publisher user needs the following privileges and quotas

grant execute_catalog_role to dba_planet;
grant select_catalog_role to dba_planet;
grant execute on dbms_cdc_publish to dba_planet;
grant create sequence to dba_planet;
grant dba to dba_planet;
exec dbms_streams_auth.grant_admin_privilege('dba_planet');

alter user dba_planet quota unlimited on system;
alter user dba_planet quota unlimited on sysaux;


--prepare the database

--supplemental logging should be enabled in database level

alter database force logging;
alter database add supplemental log data;


--check database level supplemental logging level of the database

select SUPPLEMENTAL_LOG_DATA_MIN,
       SUPPLEMENTAL_LOG_DATA_PK,
       SUPPLEMENTAL_LOG_DATA_UI from v$database;


--prepare the instance

--depending on the configuration there will some process overhead when enabling the CDC
--so instance parameters should be reviewed

select name, value, isspecified
  from v$spparameter
  where name in
    ('compatible',
     'java_pool_size',
     'job_queue_processes',
     'parallel_max_servers',
     'processes',
     'sessions',
     'streams_pool_size',
     'undo_retention');


--minimum value of the undo_retention parameter should be 7200sec

alter system set undo_retention=7200 scope=both sid='*';



--prepare source tables

--create necessary log groups
--every column to be change logged should be supplemental logging enabled
--I preferred to create individual log groups for every column I will track

alter table sch_dmall.product add supplemental log data (all) columns;
alter table sch_dmall.product add supplemental log data (primary key) columns;
alter table sch_dmall.product add supplemental log data (foreign key) columns;
alter table sch_dmall.product add supplemental log data (unique) columns;

alter table sch_dmall.product add supplemental log group lg_product1 (id, deleted, seller_id,lastmodifieddate,category_id,dis_price,price,title,subtitle,productstatus,salestatus, urlwords) always;
alter table sch_dmall.product add supplemental log group lg_product2 (id) always;
alter table sch_dmall.product add supplemental log group lg_product3 (deleted) always;
alter table sch_dmall.product add supplemental log group lg_product4 (seller_id) always;
alter table sch_dmall.product add supplemental log group lg_product5 (lastmodifieddate) always;
alter table sch_dmall.product add supplemental log group lg_product6 (category_id) always;
alter table sch_dmall.product add supplemental log group lg_product7 (dis_price) always;
alter table sch_dmall.product add supplemental log group lg_product8 (price) always;
alter table sch_dmall.product add supplemental log group lg_product9 (title) always;
alter table sch_dmall.product add supplemental log group lg_product10 (subtitle) always;
alter table sch_dmall.product add supplemental log group lg_product11 (productstatus) always;
alter table sch_dmall.product add supplemental log group lg_product12 (salestatus) always;
alter table sch_dmall.product add supplemental log group lg_product13 (urlwords) always;



it begins with the famous streams starting point which is prepare_table_instantiation procedure to log the current SCN of the table to be change captured. This procedure will let CDC processes to know from which starting SCN point to start the Change Data Capture.

begin
--dbms_capture_adm.build;
dbms_capture_adm.prepare_table_instantiation(table_name => 'sch_dmall.product');
end;
/




Now we should create the change set to associate with the CDC table. One change set can include one or more CDC tables to process.

--if exists, first drop the change set
exec dbms_cdc_publish.drop_change_set('CS_PRODUCT');

--create the change set
begin
  dbms_cdc_publish.create_change_set(
    change_set_name => 'CS_PRODUCT',
    description => 'Change set for test',
    change_source_name => 'HOTLOG_SOURCE',
    stop_on_ddl => 'n',
    begin_date => sysdate,
    end_date => null);
end;
/



After successfully create the change set, it is time to create the change table. This important because columns to be change captured and the source table will be defined here by the column_type_list and source_table parameters.

--if exists, first drop the change set
exec dbms_cdc_publish.drop_change_table('SCH_DMALL','PRODUCT_CDC','Y');

--create the change table
begin
  dbms_cdc_publish.create_change_table(
  owner             => 'sch_dmall',
  change_table_name => 'product_cdc',
  change_set_name   => 'CS_PRODUCT',
  source_schema     => 'SCH_DMALL',
  source_table      => 'PRODUCT',
  column_type_list => 'ID NUMBER, DELETED NUMBER, SELLER_ID NUMBER, LASTMODIFIEDDATE DATE, CATEGORY_ID NUMBER, ' || 
                      'DIS_PRICE NUMBER, PRICE NUMBER, TITLE VARCHAR2(255), SUBTITLE VARCHAR2(255), ' || 
                      'PRODUCTSTATUS VARCHAR2(255), SALESTATUS VARCHAR2(255), URLWORDS VARCHAR2(255) ',
  capture_values  =>'both',
  rs_id           => 'y',
  row_id          => 'y',
  user_id         => 'y',
  timestamp       => 'y',
  object_id       => 'n',
  source_colmap   => 'n',
  target_colmap   => 'y',
  options_string  => 'TABLESPACE TS_DMALL_DATA');
end;
/

After all to start the CDC processes.

begin
  dbms_cdc_publish.alter_change_set(
    change_set_name => 'CS_PRODUCT',
    enable_capture => 'y');
end;
/

When examining the product_cdc table where the changes are collected, there will be some columns ending with '$' sign will exist. These columns are for holding the streams CDC processes internal information (called control columns) which are very helpful for identifying the row information. Especially the 'operation$' and 'commit_timestamp$' columns;

operation$ : D (delete), UN (updated after image), UO (updated before image),  Inserted
commit_timestamp$ : commit timestamp

Before handing the data over the development teams, creating a view can be a good idea. In this case I have created the following view for the development team to consume. Cheers, Ergem.


create or replace view sch_dmall.v_product_cdc as
select
  case
    when operation$='I' then 'I'
    when operation$='UN' and deleted=0 then 'U'
    when operation$='UN' and deleted=1 then 'D'
    when operation$='D' then 'D'
    else operation$
  end as operation,
  id as product_id,
  deleted,
  seller_id,
  category_id,
  dis_price,
  price,
  title,
  subtitle,
  productstatus,
  salestatus,
  cscn$ id,
  commit_timestamp$ create_date
from sch_dmall.product_cdc
where operation$ in ('D','I','UN');



References;
http://docs.oracle.com/cd/E11882_01/server.112/e25554/cdc.htm#DWHSG016

Monday, August 8, 2011

Howto Recover Archive Gap in Streams Configuration

When i realized that my archive logs are not shipping to the destination database (both the databases are version 10.2.0.4) which is using downstream capture process for streams replication it was too late that i already missed around 20 archived logs. I fixed the problem which was originated from the different service definition in the log_archive_dest_2 system parameter and the TNS alias. But what about the missing archived logs?

--SOURCE DB
SQL> select name, value from v$parameter where name = 'log_archive_dest_2';

NAME               VALUE
----               -----
log_archive_dest_2 SERVICE=ODSD ASYNC NOREGISTER
                   VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE)   
                   DB_UNIQUE_NAME=ODSD


After correcting the entry in the tnsnames.ora file log shipping started from where it paused. I tested it by simply archiving the current redolog.

--SOURCE DB
SQL> alter system archive log current;
System altered.

SH> ls
-rw-r-----    1 oracle   dba      43451392 Aug 05 09:00 1_21759_657122256.dbf
-rw-r-----    1 oracle   dba       8708608 Aug 05 09:57 1_21760_657122256.dbf

--DESTINATION DB
SH> ls
-rw-r-----    1 oracle   dba      15930368 Aug  4 16:38 1_21727_657122256.dbf
-rw-r-----    1 oracle   dba       8708608 Aug  5 09:58 1_21760_657122256.dbf


Now the question how can i recover the gap between the last archivelog and the one created approximately 12 hours ago. First i should define the exact archived logs should be carried from the source to the target. I will check the capture process and see which SCN is it waiting for. As i am using a downstream capture process i should check the capture process in the target database.

--DESTINATION DB
SQL> select capture_name, state from v$streams_capture;

CAPTURE_NAME       STATE
------------       -----
CAPTURE_TABLE_GRP1 WAITING FOR DICTIONARY REDO: SCN 7523421102323


I find the SCN number which is waited in the target. I should find which archived log is this scn in the source database?

--SOURCE DB
SQL> SELECT 
  name, dest_id, sequence#, first_change#, next_change#, completion_time 
FROM V$ARCHIVED_LOG where 7523421102323 between first_change# and next_change#

NAME                               DEST_ID SEQUENCE# FIRST_CHANGE# COMPLETION_TIME
----                               ------- -------- ------------- ---------------
/oracle/.../1_21728_657122256.dbf  1       21728    7523421102321 04/08/11 16:38:52
ODSD,                              2       21728    7523421102321 04/08/11 16:38:54
/oracle/.../1_21729_657122256.dbf  1       21729    7523421102323 04/08/11 16:39:10


It seems that after sequence# 21728 archived logs are not shipped to the destination database. What i will do is to copy these archived logs manually to the destination host from the target by using OS commands.

After copying the archived logs i have to register the archived logs for the streams configuration. The beginning of the register command is similar to the one we already know. And a small oppss!...

--DESTINATION DB
SQL> alter database register logical logfile '/oracle/admin/ODSD/archive/1_21759_657122256.dbf';

ORA-16225: Missing LogMiner session name for Streams

SQL> select name, source_database from DBA_LOGMNR_SESSION;

ID NAME               SOURCE_DATABASE
-- ----               ---------------  
2  CAPTURE_TABLE_GRP1 CORED.CEB.LOCAL

SQL> alter database register logical logfile '/oracle/admin/ODSD/archive/1_21759_657122256.dbf' for 'CAPTURE_TABLE_GRP1';

Database altered.

SQL> select logmnr_session_id, name from DBA_LOGMNR_LOG; --or dba_registered_archived_log

LOGMNR_SESSION_ID NAME
----------------- ----
2                 /oracle/admin/ODSD/archive/1_21759_657122256.dbf
2                 /oracle/admin/ODSD/archive/1_21760_657122256.dbf
2                 /oracle/admin/ODSD/archive/1_21761_657122256.dbf


Lets check the capture process again. And the second oppss!..

--DESTINATION DB
SQL> select capture_name, state from v$streams_capture;

no rows selected.

SQL> select capture_name, status, captured_scn, applied_scn from dba_capture;

CAPTURE_NAME  STATUS CAPTURED_SCN APPLIED_SCN
------------  ------ ------------ -----------
CAPTURE_TABLE_GRP1 ABORTED 7523421102323 7523421102323


It seems the capture process is aborted while we are registering the archived logs. Maybe we need to restart it.


SQL> exec DBMS_CAPTURE_ADM.START_CAPTURE('CAPTURE_TABLE_GRP1');

SQL> select capture_name, state from v$streams_capture;

CAPTURE_NAME       STATE
------------       -----
CAPTURE_TABLE_GRP1 WAITING FOR DICTIONARY REDO: SCN 7523573032011


It seems the problem is solved and the capture process moves on with the next scn and the archived logs.

Friday, April 2, 2010

Schema replication with Oracle Streams

I tested table replication with Oracle Streams before. And now i used Schema Replication with an OLTP Tester schema. The source database is 10.2.0.4 on a windows OS and the destination is 11.1.0.7 database on same host with a windows 2008 Server. I faced a couple of problems listed at the end of this post. The purpose of the post is setting up a Schema replication using streams. And start the OLTP test and monitor the streams processes. Piece of cake!.

Lets start with the schema replication with the MAINTAIN_SCHEMAS which is the easiest way. Streams does the rest of it (creating queues, captre, propagation and apply processes.. etc. ). But before running streams packages there are some preconfiguration of the database that has to be done.

- Enabling Archivelog
- Preconfiguring Oracle Streams

And after all we should be ready for streams replication.
BEGIN
DBMS_STREAMS_ADM.MAINTAIN_SCHEMAS(
schema_names => 'U_EPEKER',
source_directory_object => 'DATA_PUMP_DIR',
destination_directory_object => 'DATA_PUMP_DIR',
source_database => 'ORCL',
destination_database => 'ORCL11G',
perform_actions => TRUE,
capture_name => 'CAPTURE_SCHEMA_U_EPEKER',
--capture_queue_table => NULL,
capture_queue_name => 'QUEUE_CAPT_SCH_U_EPEKER',
capture_queue_user => 'U_STREAMS_ADM',
propagation_name => 'PROP_SCH_U_EPEKER',
apply_name => 'APPLY_SCH_U_EPEKER',
--apply_queue_table => NULL,
apply_queue_name => 'QUEUE_APP_SCH_U_EPEKER',
apply_queue_user => 'U_STREAMS_ADM',
bi_directional => FALSE,
include_ddl => TRUE,
instantiation => DBMS_STREAMS_ADM.INSTANTIATION_SCHEMA_NETWORK);
END;

In case of any error following dba views can be queried and see which script has an error or in case of any hopeless situation you can recover the operation and remove the streams configuration and re-run the maintain_schemas after overcome any misconfiguration.

select * from DBA_RECOVERABLE_SCRIPT_BLOCKS;
select * from DBA_RECOVERABLE_SCRIPT_ERRORS;
execute DBMS_STREAMS_ADM.RECOVER_OPERATION('3A53A54E70764958BA0AFA68DAC0C7F0','PURGE');
exec DBMS_STREAMS_ADM.REMOVE_STREAMS_CONFIGURATION;
-- and on the remote server
exec DBMS_STREAMS_ADM.REMOVE_STREAMS_CONFIGURATION@ORCL11G;


Checking the streams processes if they are set all right after the maintain_schemas procedure.

select capture_name,
queue_name,
rule_set_name,
source_database,
status
from dba_capture;
/*
CAPTURE_NAME QUEUE_NAME RULE_SET_NAME SOURCE_DATABASE STATUS
------------ ---------- ------------- --------------- ------
CAPTURE_SCHEMA_U_EPEKER QUEUE_CAPT_SCH_U_EPEKER RULESET$_12 ORCL ENABLED
*/


select propagation_name,
source_queue_name,
destination_queue_name,
destination_dblink,
rule_set_name,
status
from dba_propagation;

/*
PROPAGATION_NAME SOURCE_QUEUE_NAME DESTINATION_QUEUE_NAME DESTINATION_DBLINK RULE_SET_NAME
--------------- ---------------- --------------------- ----------------- -------------
PROP_SCH_U_EPEKER QUEUE_CAPT_SCH_U_EPEKER QUEUE_APP_SCH_U_EPEKER ORCL11G RULESET$_9
*/


select sid,
serial#,
capture_name,
state
from v$streams_capture;

/*
SID SERIAL# CAPTURE_NAME STATE
--- ------ ------------ -----
128 8 CAPTURE_SCHEMA_U_EPEKER CAPTURING CHANGES
*/


Everything seems operational in the capture side. I need to check if apply process also created and runs smoothly.

select apply_name,
queue_name,
apply_captured,
rule_set_name,
status
from dba_apply@ORCL11G;

/*
APPLY_NAME QUEUE_NAME APPLY_CAPTURED RULE_SET_NAME STATUS
--------- ---------- -------------- ------------- ------
APPLY_SCH_U_EPEKER QUEUE_APP_SCH_U_EPEKER YES RULESET$_6 ENABLED
*/


select sid,
serial#,
state,
apply#,
apply_name,
total_applied,
total_errors
from v$streams_apply_coordinator@ORCL11G;

/*
SID SERIAL# STATE APPLY# APPLY_NAME
--- ------ ----- ----- ----------
116 19550 IDLE 1 APPLY_SCH_U_EPEKER
*/


I have created the schema before which includes an OLTP Simulator procedures. I am planning to share this schema in this blog further. Anyway, here is the script to create the schema: OLTP Simulator Script

Now I will use the P_CREATE_REPOS procedure to populate the schema. I hope streams is going to replicate the transactions. This create procedure will create 5 groups and 10 user each group. After that it will going to create 10 pages for each user.

exec U_EPEKER.P_CREATE_REPOS(5,10,1);

After executing this procedure there should be 5 groups and 10 users in each group and total 50 pages for 1 page each user.

select 'source ' || count(*) as user_count from u_epeker.t_user
union all
select 'destination ' || count(*) as user_count from u_epeker.t_user@ORCL11G;

/*
USER_COUNT
source 50
destination 50
*/


select 'source ' || count(*) as group_count from u_epeker.t_group
union all
select 'destination ' || count(*) as group_count from u_epeker.t_group@ORCL11G;

/*
GROUP_COUNT
source 5
destination 5
*/


select 'source ' || count(*) as page_count from u_epeker.t_pages
union all
select 'destination ' || count(*) as page_count from u_epeker.t_pages@ORCL11G;

/*
PAGE_COUNT
source 50
destination 50
*/


Seems everything goes fine. Now i will simulate an OLTP function in the source database and monitor the streams processes. For this I will use the P_CREATE_JOBS procedure of the OLTP Simulator schema and it will implicitly call the P_SIMULATE_REQUEST procedure sequentially.

-- with U_EPEKER user
exec U_EPEKER.P_CREATE_JOBS(20,1,'SIMULATE_JOBS',1);


With these parameter this procedure should create 20 jobs all intervals are 1 second means we are filling up T_VISIT_STATS_DETAILED and yes it works. Our replication should also work.

select count(*) from U_EPEKER.T_VISIT_STATS_DETAILED
union all
select count(*) from U_EPEKER.T_VISIT_STATS_DETAILED@ORCL11G;

/*
COUNT(*)
-------
385
376

on the second run

COUNT(*)
-------
385
385
*/


so it comes a few seconds behind (the time for shipping the logs i think). What about the capture and the apply processes and i can see TOTAL_MESSAGED_CAPTURED and the TOTAL_APPLIED values are getting higher.

select state, capture_name, total_messages_captured from v$streams_capture;
/*
STATE,CAPTURE_NAME,TOTAL_MESSAGES_CAPTURED
CAPTURING CHANGES,CAPTURE_SCHEMA_U_EPEKER,112817
*/


select state, apply_name, total_applied from v$streams_apply_coordinator@ORCL11G;
/*
STATE,APPLY_NAME,TOTAL_APPLIED
IDLE,APPLY_SCH_U_EPEKER,753
*/


Bonus: Excluding a table from the schema replication, it works!.

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES
(
table_name => 'u_epeker.t_visit_stats_detailed',
streams_type => 'capture',
streams_name => 'capture_schema_u_epeker',
queue_name => 'queue_capt_sch_u_epeker',
include_dml => true,
include_ddl => true,
source_database => 'orcl',
inclusion_rule => false --specifies the negative rule set
);
END;

exec DBMS_RULE_ADM.DROP_RULE('T_VISIT_STATS_DETAILED50',true);
exec DBMS_RULE_ADM.DROP_RULE('T_VISIT_STATS_DETAILED51',true);


I can drop the simulation jobs and streams configuration for now.

-- with u_epeker
exec U_EPEKER.P_DROP_JOBS;
exec DBMS_STREAMS_ADM.REMOVE_STREAMS_CONFIGURATION;
-- and on the remote server
exec DBMS_STREAMS_ADM.REMOVE_STREAMS_CONFIGURATION@ORCL11G;


I faced some problem while creating this schema replication configuration with streams.

The weird problem was, cannot shipping the logs to the destination from the source. When I examine the alert.log file of the source database i saw lots of ORA-01031 Insufficient Privileges.
After a little seach on the otn i solved the problem with createing the passwordfile from scratch with orapwd. Strange problem and i still dont know the underlying reason, anyway it is solved.

While instantiation i got an error. As this is a schema replication dont forget to create the same tablespace on the destination or else user/schema creation fails.

And last, an ORA-00600 problem which is ORA-00600: internal error code, arguments: [kwqbmcrcpts101], [], [], [], [], [], [], [] . I figured this problem after i dropped all streams configuration from the both databases and after 2 days i released that host is run out of the disk space. I found that there are lots of dump files under bdump. After a search on otn, i think i hit a 10.2.0.3 bug which occurs on streams environment. As this is a test environment i bounced the db. The problem is solved as i have already deleted streams configuration. the subject on oracle forums

Thursday, April 1, 2010

"PAUSED" state for streams capture process

My test replication somehow got broken and does not apply changes to the target database (Seem a new challange for me!) First thing i checked was streams capture process and i saw the weird message following.

select sid,
serial#,
capture_name,
state
from v$streams_capture;

/*
SID SERIAL# CAPTURE_NAME STATE
-- ------- ------------ -----
121 1524 CAPTURE_SCHEMA_U_EPEKER PAUSED FOR FLOW CONTROL
*/


Now if capture process is stopped, may be i need to check what is going on the apply process. But it seems nothing is received by the capture process. I think the problem should be on the capture side.

select sid,
serial#,
state,
apply#,
apply_name,
total_applied,
total_errors
from v$streams_apply_coordinator@ORCL11G;

/*
SID SERIAL# STATE APPLY# APPLY_NAME TOTAL_APPLIED TOTAL_ERRORS
--- ------ ---- ------ ----------- ------------- ------------
110 4208 IDLE 1 APPLY_SCH_U_EPEKER 0 0
*/


I still didnt understand anything from this message "PAUSED FOR FLOW CONTROL". I did a little search on Google and the i come to Oracle Documentation as always. It seems i need to check the streams_pool_size parameter.

select queue_name, sender_name, num_msgs, unbrowsed_msgs, memory_usage, publisher_state from V$BUFFERED_PUBLISHERS;

/*
QUEUE_NAME SENDER_NAME NUM_MSGS UNBROWSED_MSGS MEMORY_USAGE PUBLISHER_STATE
--------- ----------- -------- -------------- ----------- ---------------
QUEUE_CAPT_SCH_U_EPEKER CAPTURE_SCHEMA_U_EPEKER 15057 15057 87 IN FLOW CONTROL: TOO MANY UNBROWSED MESSAGES
*/


If the memory usage stands for MB which seems 87, then there could be a problem. As with the following invesitgation it seems ASSM gives 56Mb to streams_pool_size and it seems there could be a lack of memory. I decided to increase the streams_pool_size to 200Mb.

select name, value from v$parameter where name = 'streams_pool_size';

/*
NAME VALUE
---- -----
streams_pool_size 0
*/

select name, value/1024/1024 as mb from v$parameter where name = 'sga_target';

/*
NAME MB
---- --
sga_target 512
*/


select pool, round(sum(bytes)/1024/1024) as MB from v$sgastat group by pool;
/*
POOL MB
---- --
140
java pool 12
streams pool 56
shared pool 300
large pool 4
*/


alter system set sga_target=1G scope=spfile;
alter system set streams_pool_size=200M scope=spfile;
shutdown immediate;
startup;


As soon as the database opened capture process is up again. And the publishers state is now "PUBLISHING MESSAGES"

select sid,
serial#,
capture_name,
state
from v$streams_capture;
/*
SID SERIAL# CAPTURE_NAME STATE
--- ------ ------------ -----
151 1 CAPTURE_SCHEMA_U_EPEKER CAPTURING CHANGES
*/


select queue_name, sender_name, num_msgs, unbrowsed_msgs, memory_usage, publisher_state from V$BUFFERED_PUBLISHERS;
/*
QUEUE_NAME SENDER_NAME NUM_MSGS UNBROWSED_MSGS MEMORY_USAGE PUBLISHER_STATE
---------- ----------- -------- -------------- ------------ ---------------
QUEUE_CAPT_SCH_U_EPEKER CAPTURE_SCHEMA_U_EPEKER 44 0 5 PUBLISHING MESSAGES
*/


Wednesday, October 14, 2009

Some Notes on Oracle Streams Table Replication

In this test, replication is made from Oracle Database version 11.1.0.7 to 11.1.0.6 database on a virtual machine. This is not a performance test but only a functional test of the streams basic abilities. This test is made on table replication basis maybe I can publish the schema replication based test in another article.

Some preparation needed first and i assume both the databases are in archivelog mode. Actually documentation says only the source database is enough for archiving redologs. That is reasonable for capturing the changes and not to miss one. But my databases are both in archivelog mode so I cover this necessity.


--check if there are unsupported column types.
select * from DBA_STREAMS_UNSUPPORTED;

--create a tablespace on the source database for test purposes
create tablespace TS_STREAMS_SRC DATAFILE 'D:\oracledb11g\oradata\orcl\TS_STREAMS_SRC_01.dbf'
SIZE 10M AUTOEXTEND ON NEXT 10M MAXSIZE 100M EXTENT MANAGEMENT LOCAL SEGMENT SPACE MANAGEMENT AUTO;

--create a tablespace on the destination database for test purposes
create tablespace TS_STREAMS_DEST DATAFILE 'D:\oracledb11g\oradata\orcl\TS_STREAMS_DEST_01.dbf'
SIZE 10M AUTOEXTEND ON NEXT 10M MAXSIZE 100M EXTENT MANAGEMENT LOCAL SEGMENT SPACE MANAGEMENT AUTO;

--creating the streams admin user on both of the databases.
create user U_STREAMS_ADM identified by password
DEFAULT TABLESPACE TS_STREAMS_SRC
QUOTA UNLIMITED ON TS_STREAMS_SRC;

--streams user should have dba privilege
grant connect,resource,dba to U_STREAMS_ADM;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE('U_STREAMS_ADM',TRUE);

--create database links on the databases to access both from SOURCE to DESTINATION and from DESTINATION to SOURCE

--here ORCL_SOURCE is the tns alias for the SOURCE database and ORCL_DESTINATION is the tns alias for the DESTINATION database


create database link ORCL.SOURCE.LOCAL connect to U_STREAMS_ADM_SRC identified by password using 'ORCL_SOURCE';

create database link ORCL.DESINATION.LOCAL connect to U_STREAMS_ADM_SRC identified by password using 'ORCL_DESTINATION';

--a second destination should be defined on the source database to send the redo logs to the destination database
alter system set LOG_ARCHIVE_DEST_2='SERVICE=ORCL ASYNC NOREGISTER VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAME=ORCL';

-- creating a table replication with MAINTAIN_TABLES
-- with this configuration CAPTURE and PROPAGATION processes are on the source database
-- APPLY process is on the DESTINATION database

DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := 'scott.dept';
DBMS_STREAMS_ADM.MAINTAIN_TABLES(
table_names => tables,
source_directory_object => NULL,
destination_directory_object => NULL,
source_database => 'ORCL.SOURCE.LOCAL',
destination_database => 'ORCL.DESTINATION.LOCAL',
capture_name => 'CAPTURE_TABLE_GRP1',
capture_queue_name => 'QUEUE_CAPT_TABLE_GRP1',
capture_queue_user => 'U_STREAMS_ADM',
propagation_name => 'PROPAGATE_TABLE_GRP1',
apply_name => 'APPLY_TABLE_GRP1',
apply_queue_name => 'QUEUE_APPLY_TABLE_GRP1',
apply_queue_user => 'U_STREAMS_ADM',
perform_actions => TRUE,
script_directory_object => NULL,
script_name => NULL,
bi_directional => FALSE,
include_ddl => TRUE,
instantiation => DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK);
END;


The procedure worked smoothly while I was waiting for it to be end with an exception. But if there is a problem with the instantiation of the replication then the following set of commands helps you to clear the inadequate configuration and then you can run the MAINTAIN_TABLES again.


SELECT * FROM DBA_RECOVERABLE_SCRIPT_ERRORS;
execute DBMS_STREAMS_ADM.RECOVER_OPERATION('A6B753822B9C4C30A5CD87B9571ACF03','PURGE');
exec DBMS_STREAMS_ADM.REMOVE_STREAMS_CONFIGURATION();

--The insert tests just worked fine as follows.
insert into SCOTT.DEPT values (60,'IT-DEVELOPMENT','IST');
insert into SCOTT.DEPT values (70,'IT-STORAGE','IST');
insert into SCOTT.DEPT values (80,'IT-BACKUP','IST');
insert into SCOTT.DEPT values (90,'IT-WINDOWS','IST');
insert into SCOTT.DEPT values (95,'IT-UNIX','IST');
insert into SCOTT.DEPT values (96,'IT-SECURITY','IST');
commit;

--Findout the configuration and status of the processes
select * from dba_capture;
select * from dba_propagation;
select * from dba_apply;
select * from v$streams_capture;


Lets see if the configuration is flexible enough to add a new table rule to the existing set easily. I followed the documentation with the following commands in order.


--on source database
exec DBMS_CAPTURE_ADM.STOP_CAPTURE('CAPTURE_TABLE_GRP1');
exec DBMS_PROPAGATION_ADM.STOP_PROPAGATION('PROPAGATE_TABLE_GRP1');

--on the destination database
exec DBMS_APPLY_ADM.STOP_APPLY('APPLY_TABLE_GRP1');

--add new rule to the CAPTURE process
exec DBMS_STREAMS_ADM.ADD_TABLE_RULES(
TABLE_NAME =>'SCOTT.EMP',
STREAMS_TYPE => 'CAPTURE',
STREAMS_NAME => 'CAPTURE_TABLE_GRP1',
SOURCE_DATABASE => 'ORCL.SOURCE.LOCAL',
QUEUE_NAME => 'QUEUE_CAPT_TABLE_GRP1',
INCLUDE_DML => TRUE,
INCLUDE_DDL => TRUE,
--DDL_RULE_NAME => NULL,
--DML_RULE_NAME => NULL,
INCLUSION_RULE => TRUE,
AND_CONDITION => NULL);

--add new rule to the PROPAGATION process
exec DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
TABLE_NAME => 'SCOTT.EMP',
STREAMS_NAME => '', --if propagation exists on source_queue_name then uses the existing propagation
SOURCE_QUEUE_NAME => 'U_STREAMS_ADM.QUEUE_CAPT_TABLE_GRP1',
DESTINATION_QUEUE_NAME => 'U_STREAMS_ADM.QUEUE_APPLY_TABLE_GRP1@ORCL.DESTINATION.LOCAL',
INCLUDE_DML => TRUE,
INCLUDE_DDL => TRUE,
SOURCE_DATABASE => 'ORCL.SOURCE.LOCAL',
INCLUSION_RULE => TRUE,
AND_CONDITION => NULL,
QUEUE_TO_QUEUE => TRUE --TRUE=queue_to_queue FALSE=queue_to_dblink
);

--now the last thing is to add a new rule to the APPLY process
exec DBMS_STREAMS_ADM.ADD_TABLE_RULES(
TABLE_NAME =>'SCOTT.EMP',
STREAMS_TYPE => 'APPLY',
STREAMS_NAME => 'APPLY_TABLE_GRP1',
SOURCE_DATABASE => 'ORCL.SOURCE.LOCAL',
QUEUE_NAME => 'QUEUE_APPLY_TABLE_GRP1',
INCLUDE_DML => TRUE,
INCLUDE_DDL => TRUE,
--DDL_RULE_NAME => NULL,
--DML_RULE_NAME => NULL,
INCLUSION_RULE => TRUE,
AND_CONDITION => NULL);


We can now check if the new rules are added and the conditions of the rules with the following views. We cannot see and new process configuration as i added the rules to the existing APPLY, PROPAGATION and CAPTURE processes.

DBA_RULES
DBA_RULE_SETS
DBA_STREAMS_RULES
DBA_RULE_SET_RULES

If you did something wrong wile adding the rules you can easily delete the rules by running the following procedures which are under the DBMS_RULE_ADM. Be sure what are you deleting by selecting DBA_STREAMS_RULES. You dont want to delete a rule associated with a working process.


--If you want to delete a rule you should first remove it
DBMS_RULE_ADM.REMOVE_RULE
DBMS_RULE_ADM.DELETE_RULE


I added the rules and now I should instantiate the table/tables manually. MAINTAIN_TABLES procedure makes this instantiation automatically for you but if you want to add the rules explicitly then you need to instantiate them.


--set the instantiation SCN of the SOURCE table/tables
DECLARE
iscn NUMBER; --Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'scott.emp',
source_database_name => 'ORCL.SOURCE.LOCAL',
instantiation_scn => iscn);
END;

--Create the table on the DESTINATION database.
create table scott.emp as select * from scott.emp@ORCL.SOURCE.LOCAL;

--The last thing is not to forget the SUPPLEMENTAL LOGGING for the replicated table to capture and apply the changes.
ALTER TABLE scott.emp ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

--After checking all the configuration again now I can start the processes i have stopped.
-- on source database.

exec DBMS_CAPTURE_ADM.START_CAPTURE('CAPTURE_TABLE_GRP1');
exec DBMS_PROPAGATION_ADM.START_PROPAGATION('PROPAGATE_TABLE_GRP1');

--On the destination database.
exec DBMS_APPLY_ADM.START_APPLY('APPLY_TABLE_GRP1');


It seems everything worked fine as the Oracle documentation stated. Adding rules to working processes of the streams are easy accept you have to manually instantiate the tables via create table .. as .. or export/import or expdp/impdp if you are adding the rules manually. Now i want to go furher and alter a replication rule of an existing configuration of the apply process to capture only a subset of the changes.

Rules can be seen by selecting DBA_RULES, DBA_RULESETS, DBA_RULE_SET_RULES or DBA_STREAMS_RULES which i prefer to use. One should be careful which rule he/she is going to alter. There are DML and DDL rules for every process. And the rule that will be altered should be related with the correct CAPTURE PROCESS (since there can be more than one capture process).

In my case the rule that i will going to alter is BONUS85 as follows. Stopping the processes before any change in the configuration is a good idea. Without stopping and starting the processes the new configuration cannot be applied to the existing streams processes.


--On source database
exec DBMS_CAPTURE_ADM.STOP_CAPTURE('CAPTURE_TABLE_GRP1');
exec DBMS_PROPAGATION_ADM.STOP_PROPAGATION('PROPAGATE_TABLE_GRP1');

--On the destination database
exec DBMS_APPLY_ADM.STOP_APPLY('APPLY_TABLE_GRP1');

--Altering a rule capturing a subset of the data in the table.
execute dbms_rule_adm.alter_rule('BONUS85',':dml.get_object_owner()=''SCOTT'' AND :dml.get_object_name()=''BONUS''
AND :dml.is_null_tag()=''Y'' AND :dml.get_source_database_name() = ''ORCL.DIGITURK.LOCAL''
AND (:dml.get_value(''NEW'',''ENAME'').AccessVarchar2()=''ERGEM'')');

--On source database
exec DBMS_CAPTURE_ADM.START_CAPTURE('CAPTURE_TABLE_GRP1');
exec DBMS_PROPAGATION_ADM.START_PROPAGATION('PROPAGATE_TABLE_GRP1');

--On the destination database
exec DBMS_APPLY_ADM.START_APPLY('APPLY_TABLE_GRP1');

--Related views for streams monitoring
select * from v$streams_apply_coordinator
select * from v$streams_apply_reader
select * from v$streams_apply_server
select * from v$streams_capture
select * from v$streams_pool_advice
select * from v$streams_transaction
select * from v$streams_monitor


As a conclusion Oracle Streams is easy to configure and use. It seems flexible to change the configuration easily and altering the existing rules to gather the subset of the changes. I did not test the performance but functional tests seems OK.

Of course there is a lot of options you can use; Downstream Capture Processes, Sync Captures, Parallel Capture/Apply Processes and so on. Oracle Documentation has sufficient information about the Streams.

Related:
Oracle Streams Replication Administrators Guide
Oracle Streams Concepts and Administration
PSOUG Streams Demo