Initial Setup
Instance
Setup
In order to begin the following parameters should be set in the spfiles
of participating databases:
ALTER SYSTEM SET
JOB_QUEUE_PROCESSES=1;
ALTER SYSTEM SET AQ_TM_PROCESSES=1;
ALTER SYSTEM SET GLOBAL_NAMES=TRUE;
ALTER SYSTEM SET COMPATIBLE='9.2.0' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_PARALLELISM=1 SCOPE=SPFILE;
SHUTDOWN IMMEDIATE;
STARTUP;
In addition, any databases involved in capture (nuc1) must be in ARCHIVELOG
mode.
Stream Administrator Setup
Next we create
a stream administrator, a stream queue table and a database link on
the source database:
CONN sys/password@nuc1
AS SYSDBA
CREATE USER strmadmin
IDENTIFIED BY strmadminpw
DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;
GRANT CONNECT,
RESOURCE, SELECT_CATALOG_ROLE TO strmadmin;
GRANT EXECUTE ON
DBMS_AQADM TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK TO strmadmin;
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
CONNECT strmadmin/strmadminpw
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();
CREATE DATABASE
LINK TEST CONNECT TO strmadmin IDENTIFIED BY strmadminpw USING 'TEST';
This process must
be repeated on the destination database (TEST). The reverse database
link is not necessary in this example but the following grant must be
added:
GRANT ALL ON scott.dept
TO strmadmin;
LogMiner Tablespace Setup
Next we create a new tablespace to hold the logminor tables on the source
database: nuc
CONN sys/password@nuc
AS SYSDBA
CREATE TABLESPACE
logmnr_ts DATAFILE '/export/u01/oradata/TEST/logmnr01.dbf' SIZE 25 M
REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('logmnr_ts');
Supplemental
Logging
The apply process requires additional information for some actions so
we must configure suplimental logging of primary key information for
tables of interest:
CONN sys/password@nuc
AS SYSDBA
ALTER TABLE scott.dept ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk
(deptno) ALWAYS;
Propagation
Configure the propagation
process on nuc:
CONNECT strmadmin/strmadminpw@nuc
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'scott.dept',
streams_name => 'nuc_to_TEST',
source_queue_name => 'strmadmin.streams_queue',
destination_queue_name => 'strmadmin.streams_queue@test',
include_dml => true,
include_ddl => true,
source_database => 'nuc');
END;
/
The propagation
is performed using a job which can be monitored using:
SELECT job,
TO_CHAR(last_date, 'DD-Mon-YYYY HH24:MI:SS') last_date,
TO_CHAR(next_date, 'DD-Mon-YYYY HH24:MI:SS') next_date,
what
FROM dba_jobs;
Capture
Configure the capture process on nuc:
CONNECT strmadmin/strmadminpw@nuc
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'scott.dept',
streams_type => 'capture',
streams_name => 'capture_simp',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => true);
END;
/
Configure Instantiation
SCN
The instantiation
SCN of the source table must be configured in the destination table
before the apply process will work. If the destination table is
already present this can be accomplished using a metadata only export/import:
exp userid=scott/tiger@nuc
FILE=dept_instant.dmp TABLES=dept OBJECT_CONSISTENT=y ROWS=n
imp userid=scott/tiger@TEST FILE=dept_instant.dmp IGNORE=y COMMIT=y
LOG=import.log STREAMS_INSTANTIATION=y
During the transfer
of the meta information the supplematal logging was also transferred.
Since no capture is done on TEST this can be removed:
CONN sys/password@TEST
AS SYSDBA
ALTER TABLE scott.dept DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;
Alternatively the
instantiation SCN can be set using the DBMS_APPLY_ADM package:
CONNECT strmadmin/strmadminpw@nuc
DECLARE
v_scn NUMBER;
BEGIN
v_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@TEST(
source_object_name => 'scott.dept',
source_database_name => 'nuc',
instantiation_scn => v_scn);
END;
Apply
Configure
the apply process on the destination database (TEST):
CONNECT strmadmin/strmadminpw@TEST
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'scott.dept',
streams_type => 'apply',
streams_name => 'apply_simp',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => true,
source_database => 'nuc');
END;
/
Start Apply Process
Start the apply process on destination database (TEST) and prevent errors
stopping the process:
CONNECT strmadmin/strmadminpw@TEST
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_simp',
parameter => 'disable_on_error',
value => 'n');
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_simp');
END;
Start Capture Process
Start the capture process on the source database (nuc):
CONNECT strmadmin/strmadminpw@nuc
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_simp');
END;
/
Test It
With the streams activated we can see that DML changes to the source
table are visible in the destination table:
CONNECT scott/tiger@nuc
INSERT INTO dept (deptno, dname, loc) VALUES (99, 'Test Dept', 'UK');
COMMIT;
SELECT * FROM
dept;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
99 Test Dept UK
5 rows selected.
CONNECT scott/tiger@TEST
SELECT * FROM dept;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
99 Test Dept UK
5 rows selected.
We can also see
that DDL changes to the source table are reflected in the destination
table:
CONNECT scott/tiger@nuc
ALTER TABLE dept ADD (
new_col NUMBER(10)
)
/
DESC dept
Name Null? Type
---------------------------- -------- --------------
DEPTNO NOT NULL NUMBER(2)
DNAME VARCHAR2(14)
LOC VARCHAR2(13)
NEW_COL NUMBER(10)
CONNECT scott/tiger@TEST
DESC dept
Name Null? Type
---------------------------- -------- --------------
DEPTNO NOT NULL NUMBER(2)
DNAME VARCHAR2(14)
LOC VARCHAR2(13)
NEW_COL NUMBER(10)
The contents of
the streams can be viewed using statements like:
SELECT s.user_data.getTypeName()
FROM streams_queue_table s;
SET SERVEROUTPUT
ON
DECLARE
v_anydata SYS.ANYDATA;
v_lcr SYS.LCR$_ROW_RECORD;
v_row_list SYS.LCR$_ROW_LIST;
v_result PLS_INTEGER;
BEGIN
SELECT user_data
INTO v_anydata
FROM strmadmin.streams_queue_table
WHERE rownum < 2;
v_result := ANYDATA.GetObject(
self => v_anydata,
obj => v_lcr);
DBMS_OUTPUT.PUT_LINE('Command Type : ' || v_lcr.Get_Command_Type);
DBMS_OUTPUT.PUT_LINE('Object Owner : ' || v_lcr.Get_Object_Owner);
DBMS_OUTPUT.PUT_LINE('Object Name : ' || v_lcr.Get_Object_Name);
DBMS_OUTPUT.PUT_LINE('Source Database Name : ' || v_lcr.Get_Source_Database_Name);
END;
/
Clean Up
All rules can be identified and removed using the following statements:
BEGIN
FOR cur_rec IN (SELECT rule_owner, rule_name FROM dba_rules) LOOP
DBMS_RULE_ADM.DROP_RULE(
rule_name => cur_rec.rule_owner || '.' || cur_rec.rule_name,
force => TRUE);
END LOOP;
END;
/
All capture and
apply processes can be identified, stopped and dropped using:
BEGIN
FOR cur_rec IN (SELECT capture_name FROM dba_capture) LOOP
DBMS_CAPTURE_ADM.STOP_CAPTURE(
capture_name => cur_rec.capture_name);
DBMS_CAPTURE_ADM.DROP_CAPTURE(
capture_name => cur_rec.capture_name);
END LOOP;
FOR cur_rec IN
(SELECT apply_name FROM dba_apply) LOOP
DBMS_APPLY_ADM.STOP_APPLY(
apply_name => cur_rec.apply_name);
DBMS_APPLY_ADM.DROP_APPLY(
apply_name => cur_rec.apply_name);
END LOOP;
END;
/
All streams information
relating to a specific object can be purged using:
BEGIN
DBMS_STREAMS_ADM.PURGE_SOURCE_CATALOG(
source_database => 'nuc',
source_object_name => 'scott.dept',
source_object_type => 'TABLE');
END;