Oracle AQ RAC Cluster Demo
Version 10.2.0.5
 
Test Runs
1 No queue table affinity: Enqueues and Dequeues on both nodes Test failover from Shutdown Abort
2 Queue affinity for each queue set: Enqueues and Dequeues on both nodes Test failover from loss of memory interconnect *
3 Node affinity set: Enqueue on instance 1: Dequeue on instance 2 Test failover from loss of storage connection
4 Node affinity set: Enqueue and Dequeue assigned by instance Test with resumable sessions
 
Note:
Demos 3 and 4 were not run during class due to the CRS crash. 
To properly run a cluster you must multi-path your NIC connections else you have a single point of failure.
 
Step 1: Cluster Verification
conn / as sysdba

set linesize 121
col comp_name format a40
col name format a30
col value format a30
col object_name format a30

-- DB version
SELECT comp_name, version, status
FROM dba_registry
ORDER BY 1,2;

SELECT dbms_utility.port_string
FROM dual;

set serveroutput on

DECLARE
 ver    VARCHAR2(100);
 compat VARCHAR2(100);
BEGIN
  dbms_utility.db_version(ver, compat);
  dbms_output.put_line('Version: ' || ver ||' Compatible: ' || compat);
END;
/

-- Object status
SELECT owner, object_type, COUNT(*)
FROM dba_objects
WHERE status = 'INVALID'
GROUP BY owner, object_type;

DECLARE
 inst_tab dbms_utility.instance_table;
 inst_cnt NUMBER;
BEGIN
  IF dbms_utility.is_cluster_database THEN
    dbms_utility.active_instances(inst_tab, inst_cnt);
    dbms_output.put_line('-' || inst_tab.FIRST);
    dbms_output.put_line(TO_CHAR(inst_cnt));
  ELSE
    dbms_output.put_line('Not A Clustered Database');
  END IF;
END;
/

SELECT dbms_utility.current_instance
FROM dual;

-- GV$ object discussion

-- validate parameters

SELECT name, value
FROM gv$parameter
WHERE inst_id = 1
MINUS
SELECT name, value
FROM gv$parameter
WHERE inst_id = 2;
 
Step 2: Infrastructure Research
$ cd $ORACLE_HOME/rdbms/admin

more catqueue.sql

-- deciphering flags
SQL> SELECT t.schema, t.name, t.flags, q.name
2 FROM system.aq$_queue_tables t, sys.aq$_queue_table_affinities aft, system.aq$_queues q
3 WHERE aft.table_objno = t.objno
4 AND aft.owner_instance = 1
5 AND q.table_objno = t.objno
6 AND q.usage = 0 and bitand(t.flags, 4+16+32+64+128+256) = 0;

SCHEMA     NAME                           FLAGS      NAME
---------- ------------------------------ ---------- ----------------------
SYS        SCHEDULER$_EVENT_QTAB          28681      SCHEDULER$_EVENT_QUEUE
SYS        SCHEDULER$_REMDB_JOBQTAB       24585      SCHEDULER$_REMDB_JOBQ
SYS        ALERT_QT                       28681      ALERT_QUE
SYS        AQ_EVENT_TABLE                 16384      AQ_EVENT_TABLE_Q
SYS        AQ_SRVNTFN_TABLE               24584      AQ_SRVNTFN_TABLE_Q
SYS        AQ_PROP_TABLE                  24585      AQ_PROP_NOTIFY
SYSTEM     DEF$_AQCALL                    16384      DEF$_AQCALL
SYSTEM     DEF$_AQERROR                   16384      DEF$_AQERROR
SYS        SYS$SERVICE_METRICS_TAB        24585      SYS$SERVICE_METRICS
WMSYS      WM$EVENT_QUEUE_TABLE           24585      WM$EVENT_QUEUE
SYSMAN     MGMT_TASK_QTABLE               24584      MGMT_TASK_Q
SYSMAN     MGMT_NOTIFY_QTABLE             24585      MGMT_NOTIFY_Q
SYSMAN     MGMT_PAF_MSG_QTABLE_1          24584      MGMT_PAF_REQUEST_Q
SYSMAN     MGMT_PAF_MSG_QTABLE_2          24584      MGMT_PAF_RESPONSE_Q
SYSMAN     MGMT_LOADER_QTABLE             24585      MGMT_LOADER_Q
IX         ORDERS_QUEUETABLE              24585      ORDERS_QUEUE
IX         STREAMS_QUEUE_TABLE            28683      STREAMS_QUEUE
AQADMIN    SOURCE_QUEUE_TABLE             24585      SOURCE_QUEUE

18 rows selected.

24585 - 16384 = uses 10.0 name format

SQL> select 24585-16384 from dual;

24585-16384
-----------
8201

 8201 -  8192 = 10i style queue table

SQL> select 8201-8192 from dual;

8201-8192
-----------
9

     9 - 8 = for 8i multi-consumer queue table

     1 = for multiple dequeues

more catsadv.sql

more depsaq.sql
 
Step 3: Setup As SYS
conn / as sysdba

-- validate Oracle parameters (SET BACK TO 0)
show parameter aq_tm_processes

show parameter job_queue_processes

-- only if less than 20 then
ALTER SYSTEM SET job_queue_processes=20 SCOPE=BOTH;

show parameter job_queue_processes

-- create a tablespace
SELECT dbms_metadata.get_ddl('TABLESPACE', 'USERS')
FROM dual;

set long 100000

SELECT dbms_metadata.get_ddl('TABLESPACE', 'USERS')
FROM dual;

-- alter the DDL
-- create a new tablespace named UWDATA, size 300MB


-- create AQ administrator
CREATE USER aqadmin
IDENTIFIED BY aqadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA UNLIMITED ON uwdata;

GRANT create session TO aqadmin;
GRANT create procedure TO aqadmin;
GRANT create sequence TO aqadmin;
GRANT create table TO aqadmin;
GRANT create type TO aqadmin;
GRANT create public synonym TO aqadmin;
GRANT aq_administrator_role TO aqadmin;

GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
GRANT execute ON dbms_lock TO aqadmin;   -- required for demo but not for AQ
GRANT execute ON dbms_crypto TO aqadmin; -- required for demo but not for AQ
GRANT select ON dba_source TO aqadmin;

SELECT username, account_status, created
FROM dba_users
ORDER BY 1;

SELECT *
FROM dba_sys_privs
WHERE grantee = 'AQADMIN';

set linesize 121
col privilege format a15
col owner format a15

SELECT role, owner, table_name, privilege
FROM role_tab_privs
WHERE role = 'AQ_ADMINISTRATOR_ROLE'
ORDER BY 4, 2, 3;

-- loader
CREATE USER loader
IDENTIFIED BY oracle1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA UNLIMITED ON uwdata;

GRANT create session TO loader;
GRANT create synonym TO loader;

-- receiver
CREATE USER recvr
IDENTIFIED BY oracle1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA UNLIMITED ON uwdata;

GRANT create session TO recvr;
GRANT create synonym TO recvr;

-- existing queue information
SELECT owner, queue_name, queue_table, consumer_name
FROM dba_queue_subscribers;

col grantee format a15
col grantor format a15

SELECT *
FROM queue_privileges;
 
Step 4: Setup As AQADMIN
conn aqadmin/oracle1

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

desc dba_source

SELECT COUNT(*)
FROM dba_source;

-- create message user-defined data type
CREATE OR REPLACE TYPE message_t AS OBJECT (
id      NUMBER,
source  VARCHAR2(4000));
/

desc message_t

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

GRANT execute ON message_t TO public;

-- examine message for message content demo
SELECT text
FROM user_source
WHERE name = 'MESSAGE_T'
ORDER BY line;
 
Step 5: Application and Queue Build As AQADMIN
-- table to hold dequeued messages
CREATE TABLE code_processed (
id     NUMBER,
source VARCHAR2(4000));

-- create a primary key
CREATE UNIQUE INDEX pk_code_processed
ON code_processed(id)
REVERSE;

ALTER TABLE code_processed
ADD CONSTRAINT pk_code_processed
PRIMARY KEY (id)
USING INDEX;

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

-- create a queue table
/* -- CREATE QUEUE TABLE syntax
dbms_aqadm.create_queue_table(
queue_table        IN VARCHAR2,      -- table's name
queue_payload_type IN VARCHAR2,      -- user defined data type's name
storage_clause     IN VARCHAR2       DEFAULT NULL, -- define pctfree
sort_list          IN VARCHAR2       DEFAULT NULL, -- priority and/or enq_time
multiple_consumers IN BOOLEAN        DEFAULT FALSE,
message_grouping   IN BINARY_INTEGER DEFAULT NONE,
comment            IN VARCHAR2       DEFAULT NULL, -- definer's comments
auto_commit        IN BOOLEAN        DEFAULT TRUE, -- user commit not required
primary_instance   IN BINARY_INTEGER DEFAULT 0,    -- manage queue in primary
secondary_instance IN BINARY_INTEGER DEFAULT 0,    -- RAC failover if possible
compatible         IN VARCHAR2       DEFAULT NULL, -- lowest compatible version
non_repudiation    IN BINARY_INTEGER DEFAULT 0,
secure             IN BOOLEAN        DEFAULT FALSE);
*/

exec dbms_aqadm.create_queue_table(
     queue_table => 'source_queue_table',
     queue_payload_type => 'message_t',
     storage_clause => 'PCTFREE 0 PCTUSED 99',
     sort_list => 'ENQ_TIME',
     multiple_consumers => TRUE,
     comment => 'Source code queue table',
     compatible => '10.0',
     secure => FALSE);

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

col object_name format a30

SELECT o.object_name, o.object_type, i.table_name
FROM user_objects o,user_indexes i
WHERE o.object_name = i.table_name (+)
ORDER BY 2, 1;

set linesize 121
col evaluation_function format a30
col evaluation_context_comment format a30
col table_name format a30
col user_comment format a30

-- examine evaluation context
SELECT * FROM user_evaluation_contexts;

-- examine evaluation context table
SELECT * FROM user_evaluation_context_tables;

-- examine queues
SELECT name, queue_table, user_comment
FROM user_queues;

-- examine sequence / pay specific attention to the order_flag parameter
SELECT sequence_name, min_value, max_value,
increment_by, cycle_flag, order_flag, cache_size
FROM user_sequences;

set describe depth 1

-- examine tables
desc SOURCE_QUEUE_TABLE

set describe depth all

desc SOURCE_QUEUE_TABLE

desc AQ$_SOURCE_QUEUE_TABLE_G
desc AQ$_SOURCE_QUEUE_TABLE_H
desc AQ$_SOURCE_QUEUE_TABLE_I
desc AQ$_SOURCE_QUEUE_TABLE_T
desc CODE_PROCESSED

SELECT table_name, iot_name
FROM user_tables;

-- examine views
desc AQ$SOURCE_QUEUE_TABLE
desc AQ$SOURCE_QUEUE_TABLE_S

-- create the source_queue using the source_queue_table
/* -- CREATE QUEUE syntax
dbms_aqadm.create_queue (
queue_name          IN VARCHAR2,      -- queue's name
queue_table         IN VARCHAR2,      -- previously defined queue table
queue_type          IN BINARY_INTEGER DEFAULT NORMAL_QUEUE, -- Normal or Exception
max_retries         IN NUMBER         DEFAULT NULL,  -- default is 2**31-1
retry_delay         IN NUMBER         DEFAULT 0,     -- in seconds
retention_time      IN NUMBER         DEFAULT 0,
dependency_tracking IN BOOLEAN        DEFAULT FALSE, -- must be FALSE: the default
comment             IN VARCHAR2       DEFAULT NULL,  -- definer's comment
auto_commit         IN BOOLEAN        DEFAULT TRUE);
*/

exec dbms_aqadm.create_queue(
     queue_name => 'source_queue',
     queue_table => 'source_queue_table',
     queue_type => dbms_aqadm.NORMAL_QUEUE,
     max_retries => 3,
     retry_delay => 0,
     comment => 'Source Code Queue');

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type
ORDER BY 1;
-- note QUEUE and RULE SET creation plus new views

SELECT name, queue_table, user_comment
FROM user_queues;

SELECT rule_set_name, rule_set_eval_context_owner, rule_set_eval_context_name
FROM user_rule_sets;

set long 100000
set pagesize 0

SELECT view_name, text
FROM user_views;

set pagesize 25

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;

-- Start source queue
/* -- CREATE START_QUEUE syntax
dbms_aqadm.start_queue (
queue_name IN VARCHAR2,
enqueue    IN BOOLEAN DEFAULT TRUE,
dequeue    IN BOOLEAN DEFAULT TRUE);
*/

exec dbms_aqadm.start_queue(queue_name => 'SOURCE_QUEUE');

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;

-- create queue subscribers
/* -- GRANT QUEUE PRIVILEGE syntax
dbms_aqadm.grant_queue_privilege (
privilege    IN VARCHAR2,
queue_name   IN VARCHAR2,
grantee      IN VARCHAR2,
grant_option IN BOOLEAN DEFAULT FALSE);
*/

BEGIN
  dbms_aqadm.grant_queue_privilege('DEQUEUE', 'SOURCE_QUEUE', 'LOADER', FALSE);
  dbms_aqadm.grant_queue_privilege('ENQUEUE', 'SOURCE_QUEUE', 'RECVR', FALSE);
END;
/

-- create queue subscribers
/* -- AQ$_AGENT TYPE definition
TYPE aq$_agent AS OBJECT (
name     VARCHAR2(30),      -- name of message producer or consumer
address  VARCHAR2(1024),    -- Protocol-specific address of the recipient.
                            -- Must be in the form [schema.]queue[@dblink].
protocol NUMBER DEFAULT 0); -- must be 0, other values for internal use only
*/

/* -- ADD SUBSCRIBER syntax
dbms_aqadm.add_subscriber(
queue_name     IN VARCHAR2,              -- name of queue
subscriber     IN sys.aq$_agent,         -- name, address and, protocol
rule           IN VARCHAR2 DEFAULT NULL, -- conditional / similar to WHERE clause
transformation IN VARCHAR2 DEFAULT NULL  -- message transformation rule
queue_to_queue IN BOOLEAN DEFAULT FALSE, -- TRUE indicates queue-to-queue messaging
delivery_mode  IN PLS_INTEGER DEFAULT dbms_aqadm.persistent); -- BUFFERED,
                                         -- PERSISTENT_OR_BUFFERED, or PERSISTENT:
                                         -- Not modifiable by ALTER_SUBSCRIBER
*/

DECLARE
 subsc_t    sys.aq$_agent;
 subsc_addr VARCHAR2(1024) := 'AQADMIN.SOURCE_QUEUE';
BEGIN
  subsc_t := sys.aq$_agent('loader', subsc_addr, 0);
  dbms_aqadm.add_subscriber('source_queue', subsc_t);

  subsc_t := sys.aq$_agent('recvr', subsc_addr, 0);
  dbms_aqadm.add_subscriber('source_queue', subsc_t);
END;
/

-- create propagation schedule
/* -- CREATE SCHEDULE PROPAGATION syntax
dbms_aqadm.schedule_propagation (
queue_name        IN VARCHAR2,                 -- name of queue
destination       IN VARCHAR2 DEFAULT NULL,    -- destination database link
start_time        IN DATE     DEFAULT SYSDATE, -- initial propagation start time
duration          IN NUMBER   DEFAULT NULL,    -- propagation window in seconds
next_time         IN VARCHAR2 DEFAULT NULL,    -- date-time of next window
latency           IN NUMBER   DEFAULT 60,      -- maximum wait in seconds
destination_queue IN VARCHAR2 DEFAULT NULL);   -- target queue name and db link
*/

exec dbms_aqadm.schedule_propagation(queue_name=>'source_queue', latency =>0);

/* -- DEQUEUE_OPTIONS_T definition
TYPE DEQUEUE_OPTIONS_T IS RECORD (
consumer_name  VARCHAR2(30) DEFAULT NULL,
dequeue_mode   BINARY_INTEGER DEFAULT REMOVE,
navigation     BINARY_INTEGER DEFAULT NEXT_MESSAGE,
visibility     BINARY_INTEGER DEFAULT ON_COMMIT,
wait           BINARY_INTEGER DEFAULT FOREVER,
msgid          RAW(16) DEFAULT NULL,
correlation    VARCHAR2(128) DEFAULT NULL,
deq_condition  VARCHAR2(4000) DEFAULT NULL,
signature      aq$_sig_prop DEFAULT NULL,
transformation VARCHAR2(60) DEFAULT NULL,
delivery_mode  PLS_INTEGER DEFAULT PERSISTENT);
*/

/* -- MESSAGE_PROPERTIES_T definition
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority        BINARY_INTEGER DEFAULT 1,
delay           BINARY_INTEGER DEFAULT NO_DELAY,
expiration      BINARY_INTEGER DEFAULT NEVER,
correlation     VARCHAR2(128) DEFAULT NULL,
attempts        BINARY_INTEGER,
recipient_list  aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time    DATE,
state           BINARY_INTEGER,
sender_id       aq$_agent DEFAULT NULL,
original_msgid  RAW(16) DEFAULT NULL);
*/

/* -- AQ$_RECIPIENT_LIST_T definition
TYPE SYS.AQ$_RECIPIENT_LIST_T IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER;
*/

/* -- DEQUEUE syntax
dbms_aq.dequeue(
queue_name         IN  VARCHAR2,
dequeue_options    IN  dequeue_options_t,
message_properties OUT message_properties_t,
payload            OUT <user_defined_data_type_name>
msgid              OUT RAW);
*/
 
Step 6: Create Dequeue Procedure
-- create procedure to dequeue messages
CREATE OR REPLACE PROCEDURE demo_dequeue AUTHID DEFINER IS
 deq_msgid RAW(16);
 dopt      dbms_aq.dequeue_options_t;
 mprop     dbms_aq.message_properties_t;
 payload_t message_t;
 q_on_hand PLS_INTEGER;

 no_messages EXCEPTION;
 pragma exception_init(no_messages, -25228);

 pragma autonomous_transaction;
BEGIN
  dopt.consumer_name := 'RECVR';
  dopt.dequeue_mode := dbms_aq.remove;
  dopt.navigation := dbms_aq.first_message;
  dopt.visibility := dbms_aq.immediate;
  dopt.wait := 10;

  dbms_aq.dequeue('aqadmin.source_queue', dopt, mprop, payload_t, deq_msgid);

  INSERT INTO code_processed
  (id, source)
  VALUES
  (payload_t.id, payload_t.source);
  COMMIT;
EXCEPTION
  WHEN no_messages THEN
    dbms_output.put_line('All queue messages processed');
    COMMIT;
END demo_dequeue;
/
 
CREATE OR REPLACE PUBLIC SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;

GRANT execute ON demo_dequeue TO recvr;

conn recvr/oracle1

CREATE SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;

conn aqadmin/oracle1

-- create procedure to enqueue messages
/* -- ENQUEUE_OPTIONS_T definition
TYPE ENQUEUE_OPTIONS_T IS RECORD (
visibility         BINARY_INTEGER DEFAULT ON_COMMIT,
relative_msgid     RAW(16) DEFAULT NULL,
sequence_deviation BINARY_INTEGER DEFAULT NULL,
transformation     VARCHAR2(60) DEFAULT NULL);
*/

/* -- MESSAGE_PROPERTIES_T definition
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority        BINARY_INTEGER DEFAULT 1,
delay           BINARY_INTEGER DEFAULT NO_DELAY,
expiration      BINARY_INTEGER DEFAULT NEVER,
correlation     VARCHAR2(128) DEFAULT NULL,
attempts        BINARY_INTEGER,
recipient_list  aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time    DATE,
state           BINARY_INTEGER,
sender_id       aq$_agent DEFAULT NULL,
original_msgid  RAW(16) DEFAULT NULL);
*/

/* -- ENQUEUE syntax
dbms_aq.enqueue(
queue_name         IN  VARCHAR2,
enqueue_options    IN  enqueue_options_t,
message_properties IN  message_properties_t,
payload            IN  <user_defined_data_type>,
msgid              OUT RAW);
*/
 
CREATE OR REPLACE PROCEDURE demo_enqueue(usermsg MESSAGE_T) AUTHID DEFINER IS
 enq_msgid RAW(16);
 eopt      dbms_aq.enqueue_options_t;
 mprop     dbms_aq.message_properties_t;
 aprop     sys.aq$_agent;

 pragma autonomous_transaction;
BEGIN
  aprop := sys.aq$_agent(USER, NULL, 0);

  mprop.priority := 1;
  mprop.sender_id := aprop;
  mprop.delay := dbms_aq.no_delay;
  mprop.expiration := 1800;  -- push to exception queue in 30 min.

  dbms_aq.enqueue('aqadmin.source_queue', eopt, mprop, usermsg, enq_msgid);
  COMMIT;
END demo_enqueue;
/

CREATE OR REPLACE PUBLIC SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;

GRANT execute ON demo_enqueue TO loader;

conn loader/oracle1

CREATE SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;
 
Step 7: Write Enqueues Message Procedure
conn aqadmin/oracle1

CREATE SEQUENCE seq NOORDER;

desc user_sequences

SELECT sequence_name, order_flag
FROM user_sequences;

CREATE OR REPLACE PROCEDURE enq_source IS
 usermsg  aqadmin.message_t;
 x        PLS_INTEGER;
BEGIN
  FOR rec IN (SELECT text FROM dba_source) LOOP
    SELECT seq.NEXTVAL
    INTO x
    FROM dual;

    usermsg := aqadmin.message_t(x, rec.text);
    demo_enqueue(usermsg);

--     dbms_lock.sleep(3);
  END LOOP;
END enq_source;
/

CREATE PUBLIC SYNONYM enq_source FOR aqadmin.enq_source;

GRANT execute ON enq_source TO loader;
 
Step 8: Dequeues Message Procedure
conn aqadmin/oracle1

CREATE OR REPLACE PROCEDURE deq_source AUTHID DEFINER IS
 qlist          dbms_aq.aq$_agent_list_t;
 agent_w_msg    sys.aq$_agent;
 listen_timeout EXCEPTION;
 pragma exception_init(listen_timeout, -25254);
BEGIN
  qlist(0) := sys.aq$_agent(USER, 'AQADMIN.SOURCE_QUEUE', NULL);
  /* if retrieving message for multiple users simultaneously example
  qlist(0) := sys.aq$_agent('GenPharm', 'AQADMIN.RX_QUEUE', NULL);
  qlist(1) := sys.aq$_agent('ICUPharm', 'AQADMIN.RX_QUEUE', NULL);
  */


  LOOP
    BEGIN
      dbms_aq.listen(agent_list => qlist, wait => 15, agent => agent_w_msg);
      demo_dequeue;
    EXCEPTION
      WHEN listen_timeout THEN
        EXIT;
    END;
  END LOOP;
END deq_source;
/

CREATE PUBLIC SYNONYM deq_source FOR aqadmin.deq_source;

GRANT execute ON deq_source TO recvr;
 
Step 9: To Run Demo
The following requires that you simultaneously open three SQL*Plus sessions. In the first session log on as the message receiver and type step 2 but do not execute it. In the second session do the same thing as the loader. Again not executing the stored procedure. Then log on as the aqadmin, set up the SQL*Plus environment and execute one of the two queries then enter a slash (to repeat the query) but do not press the <Enter> key.
Step Session 1 Session 2 Session 3
  Dequeue Messages Enqueue Messages Monitor Performance
1 conn recvr/oracle1 conn loader/oracle1 conn aqadmin/oracle1
2 exec deq_source exec enq_source  

conn aqadmin/oracle1

-- count the number of messages received
SELECT COUNT(*) FROM code_processed;

-- AQ stats
SELECT *
FROM gv$aq;

SELECT COUNT(*)
FROM source_queue_table;

col name format a40

SELECT inst_id, latch#, name, gets, misses, sleeps, spin_gets
FROM gv$latch
WHERE name LIKE '%AQ%';

SELECT inst_id, latch#, name, immediate_gets, immediate_misses, waiters_woken, waits_holding_latch
FROM gv$latch
WHERE name LIKE '%AQ%';

SELECT inst_id, latch#, name, spin_gets, sleep1, sleep2, sleep3, sleep4
FROM gv$latch
WHERE name LIKE '%AQ%';

SELECT inst_id, latch#, name, sleep5, sleep6, sleep7, sleep8, sleep9, sleep10
FROM gv$latch
WHERE name LIKE '%AQ%';

SELECT inst_id, latch#, name, sleep11, wait_time
FROM gv$latch
WHERE name LIKE '%AQ%';

-- remember this query because we are going to make some modifications to the queue table
SELECT *
FROM sys.aq$_queue_table_affinities;

SELECT t.schema, t.name, t.flags, q.name
FROM system.aq$_queue_tables t, sys.aq$_queue_table_affinities aft, system.aq$_queues q
WHERE aft.table_objno = t.objno
AND aft.owner_instance = 1
AND q.table_objno = t.objno
AND q.usage = 0 and bitand(t.flags, 4+16+32+64+128+256) = 0;

-- cache fusion query 1
col "AVG RECEIVE TIME (ms)" FORMAT 9999999.9
col inst_id format 9999

SELECT b1.inst_id, b2.value "RECEIVED", b1.value "RECEIVE TIME", ((b1.value / b2.value) * 10) "AVG RECEIVE TIME (ms)"
FROM gv$sysstat b1, gv$sysstat b2
WHERE b1.name = 'gc cr block receive time'
AND b2.name = 'gc cr blocks received'
AND b1.inst_id = b2.inst_id;

-- cache fusion query 2
col "AVG RECEIVE TIME (ms)" format 9999999.9
col inst_id FORMAT 9999

SELECT b1.inst_id, b2.value "RECEIVED", b1.value "RECEIVE TIME", ((b1.value / b2.value) * 10) "AVG RECEIVE TIME (ms)"
FROM gv$sysstat b1, gv$sysstat b2
WHERE b1.name = 'gc current block receive time' 
AND b2.name = 'gc current blocks received' 
AND b1.inst_id = b2.inst_id;

Then, with everything set up ... go to the first window and hit the <Enter> key, then the second, then the third and finally the same in the AQADMIN session. Continue to monitor the AQADMIN session while the Loader and Receiver enqueue and dequeue messages.
 
 
Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: © 2012 Daniel A. Morgan All Rights Reserved