Oracle Change Data Capture Synchronous Mode Demo
Version 12.1.0.2

General Information
Library Note [an error occurred while processing this directive]
Demo Description Change Data Capture, CDC has been deprecated as of 12cR1 but still works perfectly well for what it was intended to do. This demonstration shows how to utilize CDC's sychronous mode capture to replicate data between schemas which could, just as easily be in different databases on different servers, in different data centers.
 
Setup As SYS - Prepare Database and Instance
conn / as sysdba

-- *NIX only and if not in glogin.sql which is where it belongs
define _editor=vi

-- validate database parameters
archive log list                    -- archive mode
show parameter global_names         -- must be TRUE

SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE FORCE LOGGING;

-- one option among several

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

ALTER DATABASE OPEN;

-- validate archive logging
ARCHIVE LOG LIST;
ALTER SYSTEM SWITCH LOGFILE;
ARCHIVE LOG LIST;

-- validate force and supplemental logging
col log_min format a7
col log_pk format a6
col log_pk format a6
col log_ui format a6
col log_fk format a6
col log_all format a7
col force_log format a9

SELECT supplemental_log_data_min LOG_MIN, supplemental_log_data_pk LOG_PK, supplemental_log_data_ui LOG_UI, supplemental_log_data_fk LOG_FK, supplemental_log_data_all LOG_ALL, force_logging FORCE_LOG
FROM v$database;

SELECT tablespace_name, force_logging
FROM dba_tablespaces;

-- examine CDC related data dictionary objects
SELECT table_name
FROM dba_tables
WHERE owner = 'SYS'
AND table_name LIKE 'CDC%$';

desc cdc_system$

SELECT * FROM cdc_system$;
 
Setup As SYS - Create Streams Administrators
conn sys@pdbdev as sysdba

SELECT *
FROM dba_streams_administrator;

CREATE USER cdcadmin
IDENTIFIED BY cdcadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 20M ON uwdata;

-- system privs
GRANT create session TO cdcadmin;
GRANT create table TO cdcadmin;
GRANT create sequence TO cdcadmin;
GRANT create procedure TO cdcadmin;
GRANT create any job TO cdcadmin;

-- role privs
GRANT execute_catalog_role TO cdcadmin;
GRANT select_catalog_role TO cdcadmin;

-- object privileges
GRANT execute ON dbms_cdc_publish TO cdcadmin;
GRANT execute ON dbms_cdc_subscribe TO cdcadmin;

-- streams specific priv
execute dbms_streams_auth.grant_admin_privilege('CDCADMIN');

SELECT account_status, created
FROM dba_users
WHERE username = 'CDCADMIN';

SELECT *
FROM dba_sys_privs
WHERE grantee = 'CDCADMIN';

SELECT username
FROM dba_users u, streams$_privileged_user s
WHERE u.user_id = s.user#;

col local_privileges format a20
col access_from_remote format a20

SELECT *
FROM dba_streams_administrator;
 
Prepare Schema Tables for CDC Replication
conn sys@pdbdev as sysdba

alter user hr account unlock identified by hr;

connect hr/hr@pdbdev

SELECT table_name, reason
FROM all_streams_unsupported
WHERE owner = 'HR'
ORDER BY 1;

desc employees

SELECT *
FROM employees;

-- create CDC demo table
CREATE TABLE cdc_demo1 AS
SELECT * FROM employees;

ALTER TABLE cdc_demo1
ADD CONSTRAINT pk_cdc_demo1
PRIMARY KEY (employee_id)
USING INDEX;

-- a second way to implement supplemental logging
SELECT * FROM user_log_groups;

ALTER TABLE cdc_demo1
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

col table_name format a20
col log_group_name format a15

SELECT * FROM user_log_groups;

-- table to track salary history changes originating in cdc_demo
CREATE TABLE salary_history (
employee_id NUMBER(6),
first_name  VARCHAR2(20),
last_name   VARCHAR2(25),
old_salary  NUMBER(8,2),
new_salary  NUMBER(8,2),
pct_change  NUMBER(4,2),
action_date DATE);

SELECT object_name, created
FROM user_objects
WHERE object_type = 'TABLE'
ORDER BY 2,1;
 
Instantiate Source Table
conn cdcadmin/cdcadmin@pdbdev

desc dba_capture_prepared_tables

SELECT table_name, scn, supplemental_log_data_pk, supplemental_log_data_ui,
supplemental_log_data_fk, supplemental_log_data_all
FROM dba_capture_prepared_tables;

dbms_capture_adm.prepare_table_instantiation(
table_name           IN VARCHAR2,
supplemental_logging IN VARCHAR2 DEFAULT 'keys');

exec dbms_capture_adm.prepare_table_instantiation('HR.CDC_DEMO1');

SELECT table_name, scn, supplemental_log_data_pk PK, supplemental_log_data_ui UI,
supplemental_log_data_fk FK, supplemental_log_data_all "ALL"
FROM dba_capture_prepared_tables;
 
Create Synchronous Change Set
conn cdcadmin/cdcadmin@pdbdev

desc all_change_sets

col set_name format a13
col change_source_name format a19

SELECT set_name, change_source_name, created, ignore_ddl, begin_date, end_date
FROM all_change_sets;

dbms_cdc_publish.create_change_set(
change_set_name    IN VARCHAR2,
description        IN VARCHAR2 DEFAULT NULL,
change_source_name IN VARCHAR2,
stop_on_ddl        IN CHAR DEFAULT 'N',
begin_date         IN DATE DEFAULT NULL,
end_date           IN DATE DEFAULT NULL
);

-- this may take a minute or two
exec dbms_cdc_publish.create_change_set('CDC_DEMO_SET', 'Synch Capture Demo', 'SYNC_SOURCE');

SELECT set_name, change_source_name, created, ignore_ddl, begin_date, end_date
FROM all_change_sets;

conn sys@pdbdev as sysdba

desc cdc_change_sets$

set linesize 121
col set_name format a20
col capture_name format a20
col queue_name format a20
col queue_table_name format a20

SELECT set_name, capture_name, queue_name, queue_table_name
FROM cdc_change_sets$;

col publisher format a15

SELECT set_name, change_source_name, capture_enabled, stop_on_ddl, publisher
FROM change_sets;

desc streams$_process_params;

SELECT process_type, name
FROM streams$_process_params;
 
Create Change Table
conn cdcadmin/cdcadmin@pdbdev

dbms_cdc_publish.create_change_table(
owner             IN VARCHAR2,
change_table_name IN VARCHAR2,
change_set_name   IN VARCHAR2,
source_schema     IN VARCHAR2,
source_table      IN VARCHAR2,
column_type_list  IN VARCHAR2,
capture_values    IN VARCHAR2, -- BOTH, NEW, OLD
rs_id             IN CHAR,
row_id            IN CHAR,
user_id           IN CHAR,
timestamp         IN CHAR,
object_id         IN CHAR,
source_colmap     IN CHAR,
target_colmap     IN CHAR,
options_string    IN VARCHAR2,
ddl_markers       IN CHAR DEFAULT 'Y');

exec dbms_cdc_publish.create_change_table('CDCADMIN', 'CDC_DEMO_CT', 'CDC_DEMO_SET', 'HR', 'CDC_DEMO1', 'EMPLOYEE_ID NUMBER(6), FIRST_NAME VARCHAR2(20), LAST_NAME VARCHAR2(25), SALARY NUMBER', 'BOTH', 'Y', 'Y', 'Y', 'N', 'N', 'Y', 'Y', ' TABLESPACE USERS pctfree 0 pctused 99', 'N');

-- note the importance for synchronous of ddl_markers (new in 11g)
SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;

col high_value format a15

SELECT table_name, composite, partition_name, high_value
FROM user_tab_partitions;

GRANT select ON cdc_demo_ct TO hr;

conn sys@pdbdev as sysdba

desc cdc_change_tables$

col change_set_name format a20
col source_schema_name format a20
col source_table_name format a20

SELECT change_set_name, source_schema_name, source_table_name
FROM cdc_change_tables$;
 
Create Subscription
conn hr/hr@pdbdev

desc user_subscriptions

SELECT set_name, username, subscription_name, created, status
FROM user_subscriptions;

dbms_cdc_subscribe.create_subscription(
change_set_name   IN VARCHAR2,
description       IN VARCHAR2,
subscription_name IN VARCHAR2);

exec dbms_cdc_subscribe.create_subscription('CDC_DEMO_SET', 'Sync Capture Demo Set', 'CDC_DEMO_SUB');

col subscription_name format a20

SELECT set_name, username, subscription_name, created, status
FROM user_subscriptions;

conn sys@pdbdev as sysdba

set linesize 121
col description format a30
col subscription_name format a20
col username format a10

SELECT subscription_name, handle, set_name, username, earliest_scn, description
FROM cdc_subscribers$;
 
Subscribe to and Activate Subscription
conn hr/hr@pdbdev

desc user_subscribed_columns

SELECT handle, source_table_name, column_name, subscription_name
FROM user_subscribed_columns;

dbms_cdc_subscribe.subscribe(
subscription_name IN VARCHAR2,
source_schema     IN VARCHAR2,
source_table      IN VARCHAR2,
column_list       IN VARCHAR2,
subscriber_view   IN VARCHAR2);

BEGIN
  dbms_cdc_subscribe.subscribe('CDC_DEMO_SUB', 'HR', 'CDC_DEMO1',
  'EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY', 'CDC_DEMO_SUB_VIEW');
END;
/

SELECT handle, source_table_name, column_name, subscription_name
FROM user_subscribed_columns;

dbms_cdc_subscribe.activate_subscription(
subscription_name IN VARCHAR2);

exec dbms_cdc_subscribe.activate_subscription('CDC_DEMO_SUB');

SELECT set_name, subscription_name, status
FROM user_subscriptions;
 
Create Procedure To Populate Salary History Table
conn hr/hr@pdbdev

/* Create a stored procedure to populate the new HR.SALARY_HISTORY table. The procedure extends the subscription window of the CDC_DEMP_SUB subscription to get the most recent set of source table changes. It uses the subscriber's DEMO_SUB_VIEW view to scan the changes and insert them into the SALARY_HISTORY table. It then purges the subscription window to indicate that it is finished with that set of changes. */

CREATE OR REPLACE PROCEDURE update_salary_history AUTHID DEFINER IS
 CURSOR cur IS
 SELECT *
 FROM (
   SELECT 'I' opt, cscn$, rsid$, employee_id, first_name, last_name, 0 old_salary,
   salary new_salary, commit_timestamp$
   FROM cdc_demo_sub_view
   WHERE operation$ = 'I '
   UNION ALL
   SELECT 'D' opt, cscn$, rsid$, employee_id, first_name, last_name, salary old_salary,
   0 new_salary, commit_timestamp$
   FROM cdc_demo_sub_view
   WHERE operation$ = 'D '
   UNION ALL
   SELECT 'U' opt , v1.cscn$, v1.rsid$, v1.employee_id, v1.first_name, v1.last_name,
   v1.salary old_salary, v2.salary new_salaryi, v1.commit_timestamp$
   FROM cdc_demo_sub_view v1, cdc_demo_sub_view v2
   WHERE v1.operation$ = 'UU' and v2.operation$ = 'UN'
   AND v1.cscn$ = v2.cscn$
   AND v1.rsid$ = v2.rsid$
   AND ABS(v1.salary - v2.salary) > 0)
   ORDER BY cscn$, rsid$;

 percent NUMBER;
BEGIN
  --Step 1 Get the change (extend the window)
  dbms_cdc_subscribe.extend_window('CDC_DEMO_SUB');

  FOR rec IN cur LOOP
    IF rec.opt = 'I' THEN
      INSERT INTO salary_history
      (employee_id, first_name, last_name, old_salary, new_salary, pct_change, action_date)
      VALUES
      (rec.employee_id, rec.first_name, rec.last_name, 0, rec.new_salary, NULL,
       rec.commit_timestamp$);
    END IF;

    IF rec.opt = 'D' THEN
      INSERT INTO salary_history
      (employee_id, first_name, last_name, old_salary, new_salary, pct_change, action_date)
      VALUES
      (rec.employee_id, rec.first_name, rec.last_name, rec.old_salary, 0, NULL,
       rec.commit_timestamp$);
    END IF;

    IF rec.opt = 'U' THEN
      percent := (rec.new_salary - rec.old_salary) / rec.old_salary * 100;
      INSERT INTO salary_history
      (employee_id, first_name, last_name, old_salary, new_salary, pct_change, action_date)
      VALUES
      (rec.employee_id, rec.first_name, rec.last_name, rec.old_salary,
      rec.new_salary, percent, rec.commit_timestamp$);
    END IF;
  END LOOP;
  COMMIT;

  --Step 2 Purge the window of consumed data
  dbms_cdc_subscribe.purge_window('CDC_DEMO_SUB');
END update_salary_history;
/
 
Source Table DML
conn hr/hr@pdbdev

SELECT employee_id, first_name, last_name, salary
FROM cdc_demo1
ORDER BY 1 DESC;

SELECT employee_id, first_name, last_name, salary
FROM cdc_demo_sub_view;

SELECT *
FROM salary_history;

UPDATE cdc_demo1
SET salary = salary+1
WHERE employee_id = 100;

COMMIT;

SELECT employee_id,first_name,last_name,salary
FROM cdc_demo_sub_view;

exec update_salary_history;

SELECT employee_id,first_name,last_name,salary
FROM cdc_demo_sub_view;

SELECT *
FROM salary_history;

conn cdcadmin/cdcadmin@pdbdev

col username$ format a10

SELECT operation$, cscn$, commit_timestamp$, username$, employee_id, salary
FROM cdc_demo_ct;
 
Capture Cleanup
conn hr/hr@pdbdev

exec dbms_cdc_subscribe.drop_subscription('CDC_DEMO_SUB');

drop table salary_history purge;

drop procedure update_salary_history;

conn / as sysdba

-- reverse prepare table instantiation
exec dbms_capture_adm.abort_table_instantiation('HR.CDC_DEMO');

-- drop the change table
exec dbms_cdc_publish.drop_change_table('CDCADMIN', 'CDC_DEMO_CT', 'Y');

-- drop the change set
exec dbms_cdc_publish.drop_change_set('CDC_DEMO_SET');

drop user cdcadmin cascade;

Related Topics
CDC Demo - Asynchronous HotLog
CDC Demo - Asynchronous AutoLog
DBMS_CAPTURE_ADM
DBMS_CDC_PUBLISH
DBMS_CDC_SUBSCRIBE
DBMS_CDC_UTILITY
DBMS_STREAMS_AUTH
DBMS_OUTPUT
Export
Import
Streams Demo 1
User

Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: © 2012 Daniel A. Morgan All Rights Reserved