| Note: This demonstration is intended to demonstrate basic techniques for implementation of schema level Streams capture and apply.
The database Orabase is my standard Windows implementation of the Oracle RDBMS on the laptop I use when traveling to conferences.
The second database, TargetDB, is configured on a second laptop running under Linux 5.2 with a connection between the two created using a GigE switch and some
Cat 5E cable. Use the library's setup page to create the required tablespaces and to perform other activities as required. |
| |
| Setup As SYS - Prepare Database and Instance (both databases) |
linux 192.168.1.91
windows 192.168.1.19
conn / as sysdba
-- validate Oracle parameters
archive log list -- document this
show parameter aq_tm_processes -- min 3
show parameter compatible -- must be 11.2.0.0.0 or above
show parameter global_names -- must be TRUE
show parameter undo_retention -- min. 3600 (1 hr.)
-- altering initialization parameters examples
alter system set compatible=11.2.0.0.0 scope=BOTH;
alter system set global_names=TRUE scope=BOTH;
alter system set undo_retention=3600 scope=BOTH;
-- retest parameter after modification
shutdown immediate;
startup mount;
alter database archivelog;
alter database open; |
| |
| Setup As SYS - Create Streams Administrators -
On Source DB create user localadmin, on Target DB create user remoteadmin |
conn / as sysdba
CREATE USER localadmin
IDENTIFIED BY localadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 50M ON uwdata;
GRANT create session TO localadmin;
GRANT create database link TO localadmin;
GRANT create procedure TO localadmin;
GRANT aq_administrator_role TO localadmin IDENTIFIED BY localadmin;
GRANT execute ON dbms_streams_adm TO localadmin;
GRANT execute ON dbms_streams_auth TO localadmin;
col username format a12
col profile format a19
SELECT username, created, expiry_date, profile, initial_rsrc_consumer_group
FROM dba_users
WHERE account_status = 'OPEN'
ORDER BY 1;
SELECT *
FROM dba_sys_privs
WHERE grantee = 'LOCALADMIN';
col privilege format a15
col owner format a15
SELECT role, owner, table_name, privilege
FROM role_tab_privs
WHERE role = 'AQ_ADMINISTRATOR_ROLE'
ORDER BY 4, 2, 3;
exec dbms_streams_auth.grant_admin_privilege('LOCALADMIN', TRUE);
exec dbms_streams_auth.grant_remote_admin_access('LOCALADMIN');
SELECT *
FROM dba_streams_administrator;
WITH q AS (SELECT MAX(snap_id) MAXSNAPID FROM dba_hist_streams_pool_advice)
SELECT size_for_estimate, size_factor, estd_spill_count, estd_unspill_count
FROM dba_hist_streams_pool_advice dhspa, q
WHERE dhspa.snap_id = q.maxsnapid;
SELECT first_time
, next_time
FROM
dba_logmnr_log
;
has no rows at this point in time
Note: DBMS_LOGMNR_D.SET_TABLESPACE can be used to change the tablespace of the logminer tables if desired |
| |
| Configure TNSNAMES.ORA (both databases) |
SOURCE =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = alpha1.mlib.org)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = orabase)
)
)
TARGET =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = alpha2.mlib.org)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = targetdb)
)
)
-- on both servers
ping 192.168.1.19
ping 192.168.1.91
tnsping source
tnsping targetdb
-- on source in SQL*Plus
conn remoteadmin/remoteadmin@target;
-- on target in SQL*Plus
conn localadmin/localadmin@orabase;
-- if any of these do not work listener's need to be properly configured and started |
| |
| Create Database Link |
| on Source DB |
on Target DB |
conn localadmin/localadmin
SELECT * FROM global_name;
CREATE DATABASE LINK targetdb
CONNECT TO remoteadmin
IDENTIFIED BY remoteadmin
USING 'TARGETDB';
SELECT COUNT(*)
FROM user_objects@target_link; |
conn remoteadmin/remoteadmin
SELECT * FROM global_name;
CREATE DATABASE LINK orabase
CONNECT TO locadmin
IDENTIFIED BY locadmin
USING 'ORABASE';
SELECT SYSDATE
FROM dual@alpha1; |
|
| |
| Prepare Schema Tables for Streams Replication |
| on Source DB |
on Target DB |
conn / as sysdba
SELECT do.object_id, do.object_name, c.type#
FROM dba_objects do, cdef$ c
WHERE do.object_id = c.obj#
AND do.owner = 'SCOTT'
AND do.object_type = 'TABLE'
ORDER BY 3;
conn scott/tiger
SELECT table_name
FROM user_tables;
ALTER TABLE bonus
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE dept
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE emp
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE salgrade
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
conn / as sysdba
SELECT do.object_id, do.object_name, c.type#
FROM dba_objects do, cdef$ c
WHERE do.object_id = c.obj#
AND do.owner = 'SCOTT'
AND do.object_type = 'TABLE'
ORDER BY 3; |
conn / as sysdba
CREATE USER scottrep
IDENTIFIED BY scottrep
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON system
QUOTA 0 ON sysaux
QUOTA 50M ON uwdata;
GRANT create session TO scottrep;
GRANT create table TO scottrep;
GRANT create type TO scottrep; |
|
| |
| Create Streams Queues |
| on Source DB |
on Target DB |
dbms_streams_adm.set_up_queue(
queue_table IN VARCHAR2 DEFAULT 'streams_queue_table',
storage_clause IN VARCHAR2 DEFAULT NULL,
queue_name IN VARCHAR2 DEFAULT 'streams_queue',
queue_user IN VARCHAR2 DEFAULT NULL,
comment IN VARCHAR2 DEFAULT NULL); |
conn locadmin/locadmin
SELECT COUNT(*)
FROM user_queues;
SELECT COUNT(*)
FROM user_queue_tables;
exec dbms_streams_adm.set_up_queue(
queue_table => 'SOURCE_SCOTT_QTAB',
storage_clause => 'PCTFREE 0 PCTUSED 99',
queue_name => 'SCOTT_CAPTURE_Q',
queue_user => 'LOCALADMIN',
comment => 'Demo Orabase Streams Queue');
desc user_queues
col name format a24
col queue_table format a18
col queue_type format a16
col retention format a10
col user_comment format a30
SELECT name, queue_table, queue_type, enqueue_enabled,
dequeue_enabled, retention, user_comment
FROM user_queues;
desc user_queue_tables
col object_type format a12
col sort_order format a12
col recipients format a11
col message_grouping format a17
SELECT queue_table, type, object_type, sort_order, recipients, message_grouping
FROM user_queue_tables;
SELECT compatible, primary_instance, secondary_instance, owner_instance, user_comment, secure
FROM user_queue_tables;
desc source_scott_qtab |
conn remoteadmin/remoteadmin
SELECT COUNT(*)
FROM user_queues;
SELECT COUNT(*)
FROM user_queue_tables;
exec dbms_streams_adm.set_up_queue(
queue_table => 'TARGET_SCOTT_QTAB',
storage_clause => 'PCTFREE 0 PCTUSED 99',
queue_name => 'SCOTT_APPLY_Q',
queue_user => 'REMOTEADMIN',
comment => 'Demo TargetDB Streams Queue');
desc user_queues
col name format a24
col queue_table format a18
col queue_type format a16
col retention format a10
col user_comment format a30
SELECT name, queue_table, queue_type, enqueue_enabled,
dequeue_enabled, retention, user_comment
FROM user_queues;
desc user_queue_tables
col object_type format a12
col sort_order format a12
col recipients format a11
col message_grouping format a17
SELECT queue_table, type, object_type, sort_order, recipients, message_grouping
FROM user_queue_tables;
SELECT compatible, primary_instance, secondary_instance, owner_instance, user_comment, secure
FROM user_queue_tables;
desc target_scott_qtab |
|
|
| |
| Create Capture Process (on Source DB only) |
dbms_streams_adm.add_schema_rules(
schema_name IN VARCHAR2,
streams_type IN VARCHAR2,
streams_name IN VARCHAR2 DEFAULT NULL,
queue_name IN VARCHAR2 DEFAULT 'streams_queue',
include_dml IN BOOLEAN DEFAULT TRUE,
include_ddl IN BOOLEAN DEFAULT FALSE,
include_tagged_lcr IN BOOLEAN DEFAULT FALSE,
source_database IN VARCHAR2 DEFAULT NULL,
dml_rule_name OUT VARCHAR2,
ddl_rule_name OUT VARCHAR2,
inclusion_rule IN BOOLEAN DEFAULT TRUE,
and_condition IN VARCHAR2 DEFAULT NULL); |
|
SELECT COUNT(*)
FROM dba_streams_schema_rules;
set serveroutput on
DECLARE
dml_rname VARCHAR2(30);
ddl_rname VARCHAR2(30);
BEGIN
dbms_streams_adm.add_schema_rules(
schema_name => 'SCOTT',
streams_type => 'CAPTURE',
streams_name => 'SCOTT_CAPTURE',
queue_name => 'SCOTT_CAPTURE_Q',
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => FALSE,
dml_rule_name => dml_rname,
ddl_rule_name => ddl_rname,
inclusion_rule => TRUE,
and_condition => NULL);
dbms_output.put_line('DML Rule Name: ' || dml_rname);
dbms_output.put_line('DDL Rule Name: ' || ddl_rname);
END;
/
-- typical return values
DML Rule Name: "LOCALADMIN"."SCOTT1"
DDL Rule Name: "LOCALADMIN"."SCOTT2"
desc dba_capture
SELECT * FROM dba_capture;
desc dba_capture_extra_attributes
SELECT * FROM dba_capture_extra_attributes;
desc dba_capture_parameters
col capture_name format a16
col parameter format a28
SELECT * FROM dba_capture_parameters;
desc dba_capture_prepared_schemas
SELECT * FROM dba_capture_prepared_schemas;
desc dba_capture_prepared_tables
col table_owner format a12
col table_name format a12
SELECT * FROM dba_capture_prepared_tables; |
| |
| Export / Import Scott Schema (preferably do this with DataPump) |
| Note: This step requires that the schema in the Source database be created in the Target database.
The example demonstrates exporting and importing the data using DataPump.
Transferring the physical file between the servers can be done with anything from SCP or FTP to a flash drive depending upon circumstances. |
| on Source DB |
on Target DB |
SELECT directory_path
FROM dba_directories
WHERE directory_name = 'DATA_PUMP_DIR';
-- in a terminal window
expdp system/oracle1 DIRECTORY=data_pump_dir DUMPFILE=streamsdemo1.dmp SCHEMAS=scott |
SELECT directory_path
FROM dba_directories
WHERE directory_name = 'DATA_PUMP_DIR';
-- in a terminal window
impdp system/oracle1 DIRECTORY=data_pump_dir DUMPFILE=data_pump_dir:streamsdemo1.dmp SCHEMAS=scott
REMAP_SCHEMA=scott:scottrep |
|
| |
| Create Propagation Process (on the source database only) |
dbms_streams_adm.add_schema_propagation_rules(
schema_name IN VARCHAR2,
streams_name IN VARCHAR2 DEFAULT NULL,
source_queue_name IN VARCHAR2,
destination_queue_name IN VARCHAR2,
include_dml IN BOOLEAN DEFAULT TRUE,
include_ddl IN BOOLEAN DEFAULT FALSE,
include_tagged_lcr IN BOOLEAN DEFAULT FALSE,
source_database IN VARCHAR2 DEFAULT NULL,
dml_rule_name OUT VARCHAR2,
ddl_rule_name OUT VARCHAR2,
inclusion_rule IN BOOLEAN DEFAULT TRUE,
and_condition IN VARCHAR2 DEFAULT NULL,
queue_to_queue IN BOOLEAN DEFAULT NULL); |
|
set serveroutput on
DECLARE
ddl_rname VARCHAR2(30);
dml_rname VARCHAR2(30);
BEGIN
dbms_streams_adm.add_schema_propagation_rules(
schema_name => 'SCOTT',
streams_name => 'SCOTT_CAPTURE',
source_queue_name => 'SCOTT_CAPTURE_Q',
destination_queue_name => 'SCOTT_APPLY_Q',
include_dml => TRUE,
include_ddl => FALSE,
include_tagged_lcr => FALSE,
source_database => 'ORABASE',
dml_rule_name => dml_rname,
ddl_rule_name => ddl_rname,
inclusion_rule => TRUE,
and_condition => NULL,
queue_to_queue => TRUE);
dbms_output.put_line('DML Rule Name: ' || dml_rname);
dbms_output.put_line('DDL Rule Name: ' || ddl_rname);
END;
/
-- typical return values
DML Rule Name: "LOCALADMIN"."SCOTT16"
DDL Rule Name:
desc dba_propagation
col propagation_name format a17
col SQName format a16
col DQOwner format a11
col DQName format a14
col DBLINK format a10
col rule_set_name format a14
col negative_rule_set_name format a23
col queue_to_queue format a15
SELECT propagation_name, status, source_queue_name SQName, destination_queue_owner DQOwner, destination_queue_name DQName,
destination_dblink DBLINK
FROM dba_propagation;
SELECT propagation_name, rule_set_name, negative_rule_set_name, queue_to_queue
FROM dba_propagation;
-- examine rules created by previous actions
desc dba_rules
col rule_owner format a12
col rule_name format a12
SELECT rule_owner, rule_name, rule_condition
FROM dba_rules;
desc dba_rulesets
col ruleset_name format a20
col base_table format a35
col ruleset_comment format a30
SELECT ruleset_name, base_table, ruleset_comment
FROM dba_rulesets
WHERE owner = 'LOCALADMIN';
desc dba_rule_set_rules
col rule_set_name format a20
col rule_set_rule_comment format a32
SELECT rule_set_name, rule_name, rule_set_rule_enabled, rule_set_rule_eval_ctx_name
FROM dba_rule_set_rules
WHERE rule_set_owner = 'LOCALADMIN';
SELECT rule_set_name, rule_name, rule_set_rule_enabled, rule_set_rule_comment
FROM dba_rule_set_rules
WHERE rule_set_owner = 'LOCALADMIN';
desc dba_streams_schema_rules
col streams_name format a14
col streams_type format a13
col rule_type format a10
col inc_tagged_lcr format a15
SELECT streams_name, streams_type, rule_type, include_tagged_lcr INC_TAGGED_LCR
FROM dba_streams_schema_rules;
SELECT streams_name, rule_type, rule_name, rule_condition
FROM dba_streams_schema_rules;
-- revisit the capture and propagation status values
SELECT capture_name, status, captured_scn, applied_scn
FROM dba_capture;
SELECT propagation_name, status
FROM dba_propagation;
-- add a table propagation rule to only replicate information on employees who's department is 20
DECLARE
ins_rule VARCHAR2(30);
upd_rule VARCHAR2(30);
del_rule VARCHAR2(30);
BEGIN
dbms_streams_adm.add_subset_rules(
table_name => 'SCOTT.EMP',
dml_condition => 'DEPTNO=20',
streams_type => 'CAPTURE',
streams_name => 'SCOTT_CAPTURE',
queue_name => 'SCOTT_CAPTURE_Q',
include_tagged_lcr =>FALSE,
source_database => NULL,
insert_rule_name => ins_rule,
update_rule_name => upd_rule,
delete_rule_name => del_rule);
dbms_output.put_line('IRULE: ' || ins_rule);
dbms_output.put_line('URULE: ' || upd_rule);
dbms_output.put_line('DRULE: ' || del_rule);
END;
/
-- typical return values
"LOCALADMIN"."EMP10"
"LOCALADMIN"."EMP10"
"LOCALADMIN"."EMP10"
SELECT rule_set_name, rule_name, rule_set_rule_enabled
FROM dba_rule_set_rules
WHERE rule_set_owner = 'LOCALADMIN'
AND rule_name = 'EMP10';
desc dba_streams_table_rules
col table_name format a11
col dml_condition format a14
col subsetting_operation format a21
SELECT streams_name, streams_type, table_name, rule_type, dml_condition, subsetting_operation
FROM dba_streams_table_rules;
SELECT streams_name, source_database, rule_name, rule_condition
FROM dba_streams_table_rules; |
| |
| Create Capture Process |
dbms_apply_adm.set_schema_instantiation_scn(
source_schema_name IN VARCHAR2,
source_database_name IN VARCHAR2,
instantiation_scn IN NUMBER,
apply_database_link IN VARCHAR2 DEFAULT NULL,
recursive IN BOOLEAN DEFAULT FALSE); |
|
| on Source DB |
on Target DB |
SELECT dbms_flashback.get_system_change_number
FROM dual; |
BEGIN
dbms_apply_adm.set_schema_instantiation_scn(
source_schema_name => 'SCOTT',
source_database_name => 'ORABASE',
instantiation_scn => 4648838),
apply_database_link => NULL,
recursive => FALSE);
END;
/ |
|
| |
| Monitor Streams Replication |
SELECT dp.propagation_name, TO_CHAR(dqs.start_date, 'HH24:MI:SS MM/DD/YY') AS START_DATE,
dqs.message_delivery_mode AS MSG_DEL_MODE, dqs.next_time, dqs.latency,
DECODE(dqs.schedule_disabled, 'Y', 'Disabled', 'N', 'Enabled') AS SCHEDULE_DISABLED,
dqs.process_name, (dqs.total_bytes/1024/1024) AS SIZE_MB
FROM dba_queue_schedules dqs, dba_propagation dp
WHERE dp.destination_dblink = COALESCE(REGEXP_SUBSTR(dqs.destination, '[^@]+', 1, 2), dqs.destination)
AND dqs.schema = dp.source_queue_owner
AND dqs.qname = dp.source_queue_name
ORDER BY dp.propagation_name;
SQL> select count(*) from DBA_STREAMS_COLUMNS where owner = 'SCOTT';
COUNT(*)
----------
18
SQL> SELECT RULE_NAME, RULE_TYPE, DML_CONDITION
2 FROM DBA_STREAMS_RULES
3 WHERE STREAMS_NAME = 'SCOTT_CAPTURE';
RULE_NAME RULE_TYPE DML_CONDITION
------------ ---------- --------------
ORABASE5 DDL
ORABASE4 DML
SCOTT1 DML
SCOTT2 DDL
EMP7 DML DEPTNO=20
EMP8 DML DEPTNO=20
EMP9 DML DEPTNO=20
EMP10 DML DEPTNO=20
EMP11 DML DEPTNO=20
EMP12 DML DEPTNO=20
SELECT RULE_NAME, RULE_CONDITION
FROM DBA_STREAMS_RULES
WHERE STREAMS_NAME = 'SCOTT_CAPTURE';
SQL> select dbms_flashback.get_system_change_number() from dual;
DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER()
-----------------------------------------
4588418
-- good on target
exec dbms_apply_adm.set_schema_instantiation_scn(source_schema_name => 'SCOTT', source_database_name => 'ORABASE', instantiation_scn => 4588418, apply_database_link => 'TARGETDB', recursive => FALSE);
-- good on target
select apply_name, status from dba_apply;
-- good on source
BEGIN
dbms_capture_adm.start_capture(capture_name => 'SCOTT_CAPTURE');
END;
/
SQL> select capture_name, start_scn, status
2 from dba_capture;
CAPTURE_NAME START_SCN STATUS
------------------------------ ---------- --------
SCOTT_CAPTURE 4489419 ENABLED
INSERT INTO emp VALUES (1111, 'MORGAN', 'DBA', 7566, TRUNC(SYSDATE), 4000, 0, 20);
COMMIT; |
| |
| Post-Streams Clean-up |
| on Source DB |
on TARGETDB |
conn / as sysdba
-- stop capture process and queue
exec dbms_streams_adm.remove_queue('SCOTT_CAPTURE_Q', TRUE, TRUE);
ALTER TABLE scott.bonus
DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE scott.dept
DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE scott.emp
DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE scott.salgrade
DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
DROP USER localadmin CASCADE;
DROP DATABASE LINK targetdb; |
conn / as sysdba
-- stop apply process
DROP USER remoteadmin CASCADE; |
|
|