Oracle Pipelined Table Functions
Version 21c

General Information
Library Note Morgan's Library Page Header
Which has the higher priority in your organization: Deploying a new database or securing the ones you already have? Looking for a website, and resources, dedicated solely to securing Oracle databases? Check out DBSecWorx.
Data Dictionary Objects
ALL_ARGUMENTS CDB_OBJECT_SIZE ERROR$
ALL_ERRORS CDB_PROCEDURES SOURCE$
ALL_OBJECT_SIZE CDB_SOURCE_AE USER_ARGUMENTS
ALL_PROCEDURES DBA_ARGUMENTS USER_ERRORS
ALL_SOURCE_AE DBA_ERRORS USER_OBJECT_SIZE
ARGUMENT$ DBA_OBJECT_SIZE USER_PROCEDURES
CDB_ARGUMENTS DBA_PROCEDURES USER_SOURCE_AE
CDB_ERRORS DBA_SOURCE_AE  
Object Privileges Privileges to tables and views granted through roles may not be valid within a procedure. See the section on AUTHID.

GRANT execute ON <object_name> TO <user_name>;
System Privileges
ALTER ANY PROCEDURE DEBUG ANY PROCEDURE EXECUTE ANY PROCEDURE
CREATE ANY PROCEDURE DROP ANY PROCEDURE SELECT ANY TABLE
CREATE PROCEDURE    
Syntax CREATE OR REPLACE FUNCTION <schema_name>.<function_name>
(<argument> [IN | OUT | IN OUT] [NOCOPY] <data type>)
RETURN <data type>
[AUTHID <CURRENT USER | DEFINER>]
[<AGGREGATE | PIPELINED>]
[PARALLEL_ENABLE (PARTITION <argument> BY [<HASH, RANGE> (<column_list>), ANY])] IS
 <constant_and_variable_declarations>
 <exception_declarations>
 <pragma_declarations>
BEGIN
  <function_body>
END <function_name>;
/
 
Tables And Data For Demo
Table Definition CREATE TABLE stocktable (
ticker       VARCHAR2(4),
open_price   NUMBER(10),
close_price  NUMBER(10));
Demo Data INSERT INTO stocktable VALUES ('ORCL', 13, 16);
INSERT INTO stocktable VALUES ('MSFT', 35, 29);
INSERT INTO stocktable VALUES ('SUNW', 7, 11);
COMMIT;
Type Definition CREATE OR REPLACE TYPE TickerType AUTHID DEFINER AS OBJECT(
ticker    VARCHAR2(4),
pricetype VARCHAR2(1),
price     NUMBER(10));
/
Create Table Type CREATE OR REPLACE TYPE TickerTypeSet AS TABLE OF TickerType;
/
Create Package CREATE OR REPLACE PACKAGE refcur_pkg AUTHID DEFINER IS
 TYPE refcur_t IS REF CURSOR RETURN StockTable%ROWTYPE;
END refcur_pkg;
/
Create Table Function CREATE OR REPLACE FUNCTION stockpivot(p refcur_pkg.refcur_t)
RETURN TickerTypeSet AUTHID DEFINER PIPELINED IS
 out_rec  TickerType := TickerType(NULL,NULL,NULL);
 in_rec   p%ROWTYPE;
BEGIN
  LOOP
    FETCH p INTO in_rec;
    EXIT WHEN p%NOTFOUND;

    out_rec.ticker := in_rec.Ticker;
    out_rec.pricetype := 'O';
    out_rec.price := in_rec.Open_Price;
    PIPE ROW(out_rec);

    out_rec.PriceType := 'C';
    out_rec.price := in_rec.Close_Price;
    PIPE ROW(out_rec);
  END LOOP;
  CLOSE p;
  RETURN;
END stockpivot;
/

desc stockpivot

set linesize 121
col pipelined format a10

SELECT object_name, pipelined, authid
FROM user_procedures;
Sample Query SELECT *
FROM TABLE(stockpivot(CURSOR(SELECT * FROM StockTable)));
A related pivot based on a different table compared with straight SQL CREATE OR REPLACE TYPE AirType AUTHID DEFINER AS OBJECT(
program_id VARCHAR2(3),
line_number NUMBER(10),
customer_id VARCHAR2(4),
order_date DATE,
delivered_date DATE);
/

CREATE OR REPLACE TYPE AirTypeSet AS TABLE OF AirType;
/

CREATE OR REPLACE PACKAGE refcur_pkg AUTHID DEFINER IS
TYPE refcur_t IS REF CURSOR RETURN airplanes%ROWTYPE;
END refcur_pkg;
/

CREATE OR REPLACE FUNCTION airpivot(p refcur_pkg.refcur_t)
RETURN AirTypeSet AUTHID DEFINER PIPELINED IS
 out_rec AirType := AirType(NULL,NULL,NULL,NULL,NULL);
 in_rec  p%ROWTYPE;
BEGIN
  LOOP
    FETCH p INTO in_rec;
    EXIT WHEN p%NOTFOUND;

    out_rec.program_id := in_rec.program_id;
    out_rec.line_number := in_rec.line_number;
    out_rec.customer_id := in_rec.customer_id;
    out_rec.order_date := in_rec.order_date;
    out_rec.delivered_date := in_rec.delivered_date;

    PIPE ROW(out_rec);
  END LOOP;
  CLOSE p;
  RETURN;
END airpivot;
/

set timing on

SELECT program_id, SUM(line_number)
FROM TABLE(airpivot(CURSOR(SELECT * FROM airplanes)))
GROUP BY program_id;

SELECT program_id, SUM(line_number)
FROM airplanes
GROUP BY program_id;

CREATE OR REPLACE TYPE d_vtyp AS VARRAY(5000) OF AirType;
/

SELECT program_id, SUM(e.line_number)
FROM TABLE(CAST(MULTISET(SELECT * FROM airplanes) AS d_vtyp)) e
GROUP BY program_id;


-- the lesson is clear ... only use a pipelined table function when SQL will not work
 
Date Generator
Generate Date List CREATE OR REPLACE TYPE date_array AS TABLE OF DATE;
/

CREATE OR REPLACE FUNCTION date_table(sdate DATE, edate DATE)
RETURN date_array AUTHID CURRENT_USER PIPELINED AS
BEGIN
  FOR i IN 0 .. (edate - sdate) LOOP
    PIPE ROW(sdate + i);
  END LOOP;
  RETURN;
END date_table;
/

desc date_table

SELECT object_name, pipelined, authid
FROM user_procedures;

SELECT *
FROM TABLE(CAST(date_table(TRUNC(SYSDATE-30), TRUNC(SYSDATE))
AS date_array));

-- joined with another table

CREATE TABLE testdata (
datecol DATE,
someval NUMBER);

INSERT INTO testdata VALUES (TRUNC(SYSDATE-25), 25);
INSERT INTO testdata VALUES (TRUNC(SYSDATE-20), 20);
INSERT INTO testdata VALUES (TRUNC(SYSDATE-15), 15);
INSERT INTO testdata VALUES (TRUNC(SYSDATE-10), 10);
INSERT INTO testdata VALUES (TRUNC(SYSDATE-5), 5);
COMMIT;

SELECT * FROM testdata;

SELECT da.column_value AS DATECOL, td.someval
FROM TABLE(CAST(date_table(TRUNC(SYSDATE-30), TRUNC(SYSDATE))
AS date_array)) da, testdata td
WHERE da.COLUMN_VALUE = td.datecol(+);

Note: A SQL alternative would be:
SELECT iv.datecol, td.someval
FROM (
WITH dates AS (SELECT SYSDATE-30 dt_start, SYSDATE dt_end FROM dual)
SELECT dt_start+rownum-1 AS DATECOL
FROM dates
CONNECT BY LEVEL <= dt_end-dt_start) iv, testdata td
WHERE TRUNC(iv.datecol) = TRUNC(td.datecol (+))
ORDER BY datecol;
 
Comma Delimited List
Turning a comma delimited list into a row-by-row output CREATE OR REPLACE TYPE str_array AS TABLE OF VARCHAR2(10);
/

CREATE OR REPLACE FUNCTION ptf(stringin VARCHAR2) RETURN str_array PIPELINED IS
 i   PLS_INTEGER;
 str VARCHAR2(100);
 tab sys.dbms_utility.uncl_array;
BEGIN
  str := '"' || REPLACE(stringin, ',', '","') || '"';
  sys.dbms_utility.comma_to_table(str, i, tab);

  FOR j IN 1 .. 5 LOOP
    PIPE ROW(TRANSLATE(tab(j),'A"','A'));
  END LOOP;
  RETURN;
END ptf;
/

SELECT *
FROM TABLE(CAST(ptf('1001,1002,1003,1004,1005') AS str_array));
 
Real-Time DBMS_OUTPUT Output
A demonstration of using a PTE to force real-time output from inside a loop with the DBMS_OUTPUT package CREATE OR REPLACE TYPE msgType AS TABLE OF VARCHAR2(60);
/

CREATE OR REPLACE FUNCTION msgOutStream RETURN msgType
PIPELINED
AUTHID CURRENT_USER AS
BEGIN
  PIPE ROW('start run ' || sysTimestamp);
  FOR i IN 1 .. 10 LOOP
    PIPE ROW('output ' || TO_CHAR(i) || ' ' || sysTimestamp);
    dbms_lock.sleep(1);
    PIPE ROW('output ' || TO_CHAR(i) || ' completed');
  END LOOP;
  RETURN;
END;
/

SELECT * FROM TABLE(msgOutStream);

SET ARRAYSIZE 1

SELECT * FROM TABLE(msgOutStream);

-- remember to return your arraysize to its original value
SET ARRAYSIZE 250
 
DBA Dependencies Investigator
A demonstration of using a PTE to return dependent objects ... in this case, for the library, excluding self-references (for example synonyms), and STANDARD CREATE OR REPLACE TYPE name_t AS TABLE OF VARCHAR2(30);
/

CREATE OR REPLACE FUNCTION dependencies(pkg_name IN VARCHAR2)
RETURN name_t AUTHID CURRENT_USER PIPELINED AS
 CURSOR dcur IS
 SELECT name FROM dba_dependencies
 WHERE referenced_name = pkg_name
 AND name <> pkg_name
 AND name <> 'STANDARD'
 UNION
 SELECT referenced_name from dba_dependencies
 WHERE name = pkg_name
 AND referenced_name <> pkg_name
 AND referenced_name <> 'STANDARD';
BEGIN
  FOR drec IN dcur LOOP
    PIPE ROW(drec.name);
  END LOOP;
  RETURN;
END dependencies;
/

SELECT * FROM TABLE(CAST(dependencies('OWA_OPT_LOCK') AS name_t));
 
Another Pivot Demo
I'm not sure where or when I got this but it looks like something Tom would have written so I will give that attribution until some other origin is established. CREATE OR REPLACE TYPE virtual_table_type AS TABLE OF number;
/

CREATE OR REPLACE FUNCTION virtual_table(p_num_rows IN NUMBER)
RETURN virtual_table_type AUTHID CURRENT_USER PIPELINED IS
BEGIN
  FOR i IN 1 .. p_num_rows LOOP
    dbms_output.put_line('going to pipe');
    PIPE ROW( i );
    dbms_output.put_line('done pipeing');
  END LOOP;
  RETURN;
END virtual_table;
/

SELECT * FROM TABLE(virtual_table(5));
SELECT * FROM TABLE(virtual_table(10));

set serveroutput on

BEGIN
  FOR x IN (SELECT * FROM TABLE(virtual_table(10))) LOOP
    dbms_output.put_line('Fetching.... ' || x.column_value);
  END LOOP;
END;
/

CREATE OR REPLACE TYPE myScalarType AS OBJECT (
c1 VARCHAR2(9),
c2 VARCHAR2(9),
c3 VARCHAR2(9),
c4 VARCHAR2(9),
c5 VARCHAR2(9),
c6 VARCHAR2(9),
c7 VARCHAR2(9));
/

desc myScalarType

CREATE OR REPLACE TYPE myArrayType AS TABLE OF myScalarType;
/

desc myArrayType

CREATE OR REPLACE FUNCTION pivot(p_cur IN sys_refcursor)
RETURN myArrayType AUTHID CURRENT_USER PIPELINED IS
 l_c1 varchar2(4000);
 l_c2 varchar2(4000);
 l_last varchar2(4000);
 l_cnt number ;
 l_data myScalarType;
BEGIN
  LOOP
    FETCH p_cur INTO l_c1, l_c2;
    EXIT WHEN p_cur%NOTFOUND;

    IF (l_last IS NULL OR l_c1 <> l_last) THEN
      IF (l_data IS NOT NULL) THEN
        pipe row(l_data);
      END IF;

      l_data := myScalarType(l_c1, l_c2, NULL, NULL, NULL, NULL, NULL);
      l_cnt := 3;
      l_last := l_c1;
    ELSE
      CASE l_cnt
      WHEN 3 THEN l_data.c3 := l_c2;
      WHEN 4 THEN l_data.c4 := l_c2;
      WHEN 5 THEN l_data.c5 := l_c2;
      WHEN 6 THEN l_data.c6 := l_c2;
      WHEN 7 THEN l_data.c7 := l_c2;
      ELSE raise program_error;
      END CASE;

      l_cnt := l_cnt+1;
    END IF;
  END LOOP;

  IF (l_data IS NOT NULL) THEN
    PIPE ROW(l_data);
  END IF;
  CLOSE p_cur;
  RETURN;
END pivot;
/

SELECT *
FROM TABLE(pivot(CURSOR(
  SELECT deptno, ename FROM scott.emp ORDER BY deptno)));

SELECT *
FROM TABLE(pivot(
CURSOR(SELECT deptno, hiredate FROM scott.emp ORDER BY deptno)));
 
Service Stats Solution Demo
Another PTF Demo based on code posted by James Colestock conn sys@pdbdev as sysdba

ALTER SESSION ENABLE PARALLEL DML;

CREATE SEQUENCE metrics_seq;

CREATE TABLE metrics (
metric_id  NUMBER,
class_id   NUMBER       NOT NULL,
class_name VARCHAR2(20) NOT NULL,
stat_id    NUMBER       NOT NULL,
stat_name  VARCHAR2(64) NOT NULL);

ALTER TABLE metrics
ADD CONSTRAINT pk_metrics
PRIMARY KEY;

ALTER TABLE metrics
ADD CONSTRAINT uk_metrics
UNIQUE (class_id, class_name, stat_id, stat_name);

CREATE OR REPLACE TYPE rec_metrics_type AUTHID DEFINER IS OBJECT (
metric_id  NUMBER,
class_id   NUMBER,
class_name VARCHAR2(20),
stat_id    NUMBER,
stat_name  VARCHAR2(64));
/

CREATE OR REPLACE TYPE tbl_metrics_type AS TABLE OF rec_metrics_type;
/

CREATE DIMENSION metrics_dim
LEVEL     class_level IS (metrics.class_id)
LEVEL     stat_level  IS (metrics.stat_id)
HIERARCHY metric_hier (stat_level CHILD OF class_level)
ATTRIBUTE class_level DETERMINES (metrics.class_name)
ATTRIBUTE stat_level  DETERMINES (metrics.stat_name);

CREATE OR REPLACE VIEW metrics_vw AS
SELECT /*+ PARALLEL (wss, 2) */ s.class class_id,
  DECODE (s.class,
       1, 'User',
       2, 'Redo',
       4, 'Enqueue',
       8, 'Cache',
      16, 'OS',
      32, 'Parallelism',
      40, 'RAC',
      64, 'SQL',
      72, 'Internal Debugging',
     128, 'Debug') class_name,
  wss.stat_id, s.name
FROM sys.wrh$_service_stat wss, sys.v$sysstat s
WHERE wss.stat_id = s.stat_id
ORDER BY s.class;

CREATE OR REPLACE PACKAGE load_dim_pkg AUTHID DEFINER AS
 TYPE metrics_cur IS REF CURSOR RETURN metrics_vw%ROWTYPE;
 FUNCTION get_data(p_cur IN load_dim_pkg.metrics_cur) RETURN tbl_metrics_type
   PARALLEL_ENABLE(PARTITION p_cur BY ANY) PIPELINED;
END load_dim_pkg;
/

CREATE OR REPLACE PACKAGE BODY load_dim_pkg AS
 FUNCTION get_data(p_cur IN load_dim_pkg.metrics_cur) RETURN tbl_metrics_type
 PARALLEL_ENABLE(PARTITION p_cur BY ANY) PIPELINED IS
  rec_in rec_metrics_type := rec_metrics_type(NULL, NULL, NULL, NULL, NULL);
  rec_out rec_metrics_type := rec_metrics_type(NULL, NULL, NULL, NULL, NULL);
 BEGIN
   LOOP
     FETCH p_cur INTO rec_in.class_id, rec_in.class_name, rec_in.stat_id,rec_in.stat_name;
     EXIT WHEN p_cur%NOTFOUND;

     rec_out.metric_id := NULL;
     rec_out.class_id := rec_in.class_id;
     rec_out.class_name := INITCAP (rec_in.class_name);
     rec_out.stat_id := rec_in.stat_id;
     rec_out.stat_name := INITCAP (rec_in.stat_name);
     PIPE ROW (rec_out);
   END LOOP;
   RETURN;
 END;
END load_dim_pkg;
/

set autotrace on
set linesize 161

MERGE INTO metrics m USING (
  SELECT * FROM TABLE(load_dim_pkg.get_data(CURSOR(SELECT DISTINCT * FROM metrics_vw)))) s1
  ON (m.stat_id = s1.stat_id)
WHEN MATCHED THEN
  UPDATE SET m.stat_name = s1.stat_name, m.class_id = s1.class_id,
             m.class_name = s1.class_name
WHEN NOT MATCHED THEN
  INSERT
  (metrics_id, class_id, class_name, stat_id, stat_name)
  VALUES
  (metrics_seq.NEXTVAL, s1.class_id, s1.class_name, s1.stat_id, s1.stat_name);

COMMIT;

set autotrace off

SELECT *
FROM sys.v$pq_sesstat
WHERE statistic IN ('Queries Parallelized', 'Server Threads');
 
XML Demo
PTF Returning XML conn uwclass/uwclass@pdbdev

CREATE OR REPLACE TYPE clob_array IS TABLE OF CLOB;
/

CREATE OR REPLACE FUNCTION ret_xml(query_str IN VARCHAR2)
RETURN clob_array PIPELINED AUTHID DEFINER IS
 l_xml XMLTYPE;
BEGIN
  l_xml := XMLTYPE(dbms_xmlgen.getxml(query_str)) ;
  FOR rec IN (SELECT * FROM TABLE(xmlsequence(l_xml.extract('/ROWSET/ROW')))) LOOP
    PIPE ROW (rec.column_value.getCLOBVal()) ;
  END LOOP;
END ret_xml;
/

SELECT * FROM TABLE(ret_xml('SELECT * FROM scott.emp'));
 
Parallel Enabled PTF
Parallel Enable Demo CREATE OR REPLACE PACKAGE pkg_test AUTHID CURRENT_USER IS
 TYPE rt_Src IS RECORD(id NUMBER, col1 VARCHAR2(100));
 TYPE rc_Src IS REF CURSOR RETURN rt_Src;

 TYPE rt_Out IS RECORD(id NUMBER, col1 VARCHAR2(100), sid NUMBER);
 TYPE tt_Out IS TABLE OF rt_Out;

 FUNCTION f_Pipe_Any(airc_Src IN rc_Src) RETURN tt_Out PIPELINED
 PARALLEL_ENABLE(PARTITION
airc_Src BY ANY);

 FUNCTION f_Pipe_Hash(airc_Src IN rc_Src) RETURN tt_Out PIPELINED
 PARALLEL_ENABLE(PARTITION airc_Src BY HASH(id));

 FUNCTION f_Pipe_Range(airc_Src IN rc_Src) RETURN tt_Out PIPELINED
 PARALLEL_ENABLE(PARTITION
airc_Src BY RANGE(id));
END;
/

CREATE OR REPLACE PACKAGE BODY pkg_test IS
FUNCTION f_Pipe_Any(airc_Src IN rc_Src) RETURN tt_Out PIPELINED
PARALLEL_ENABLE(PARTITION
airc_Src BY ANY) IS
 lr_Src rt_Src;
 lr_Out rt_Out;
BEGIN
  LOOP
    FETCH airc_Src INTO lr_Src;
    EXIT WHEN airc_Src%NOTFOUND;

    SELECT sid
    INTO lr_Out.sid
    FROM v$mystat
    WHERE ROWNUM = 1;

    lr_Out.id := lr_Src.id;
    lr_Out.col1 := lr_Src.col1;
    PIPE ROW(lr_Out);
  END LOOP;
  CLOSE airc_Src;
END;

FUNCTION f_Pipe_Hash (airc_Src IN rc_Src) RETURN tt_Out PIPELINED
PARALLEL_ENABLE(PARTITION
airc_Src BY HASH(id)) IS
lr_Src rt_Src;
lr_Out rt_Out;
BEGIN
  LOOP
    FETCH airc_Src INTO lr_Src;
    EXIT WHEN airc_Src%NOTFOUND;

    SELECT sid
    INTO lr_Out.sid
    FROM v$mystat
    WHERE ROWNUM = 1;

    lr_Out.id := lr_Src.id;
    lr_Out.col1 := lr_Src.col1;
    PIPE ROW(lr_Out);
  END LOOP;
  CLOSE airc_Src;
END;

FUNCTION f_Pipe_Range (airc_Src IN rc_Src) RETURN tt_Out PIPELINED
PARALLEL_ENABLE(PARTITION
airc_Src BY RANGE(id)) IS
 lr_Src rt_Src;
 lr_Out rt_Out;
BEGIN
  LOOP
    FETCH airc_Src INTO lr_Src;
    EXIT WHEN airc_Src%NOTFOUND;

    SELECT sid
    INTO lr_Out.sid
    FROM v$mystat
    WHERE ROWNUM = 1;

    lr_Out.id := lr_Src.id;
    lr_Out.col1 := lr_Src.col1;
    PIPE ROW(lr_Out);
  END LOOP;
  CLOSE airc_Src;
END;
END;
/

CREATE TABLE dt_src (
id   NUMBER,
col1 VARCHAR2(100));

INSERT INTO dt_src
SELECT ROWNUM, TO_CHAR(ROWNUM)
FROM dual
CONNECT BY LEVEL <= 100000;

CREATE TABLE dt_out (
id       NUMBER,
col1     VARCHAR2(100),
sid      NUMBER,
src_proc VARCHAR2(32));

set autotrace trace stat explain

INSERT INTO dt_out
SELECT id, col1, sid, 'ANY'
FROM TABLE (pkg_test.f_Pipe_Any(CURSOR(SELECT --+parallel(dt_src)
id, col1 FROM dt_src)));

INSERT INTO dt_out
SELECT id, col1, sid, 'HASH'
FROM TABLE(pkg_test.f_Pipe_Hash(CURSOR(SELECT --+parallel(dt_src)
id, col1 FROM dt_src)));

INSERT INTO dt_out
SELECT id, col1, sid, 'RANGE'
FROM TABLE (pkg_test.f_Pipe_Range(CURSOR(SELECT --+parallel(dt_src)
id, col1 FROM dt_src)));

set autotrace off

SELECT COUNT(*), sid, src_proc
FROM dt_out
GROUP BY sid, src_proc
ORDER BY sid, src_proc;
 
Drop Pipelined Table Functions
Drop PTF DROP FUNCTION <function_name>;
DROP FUNCTION ptf;

DROP FUNCTION pivot;

DROP FUNCTION stockpivot;

Related Topics
Accessible By Clause
Anonymous Block
Built-in Functions
Built-in Packages
DBMS_TF
DBMS_XMLGEN
Exception Handling
Functions
NO_DATA_NEEDED Exception
Pseudocolumns
Tables
Types
What's New In 21c
What's New In 23c

Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: This site is protected by copyright and trademark laws under U.S. and International law. © 1998-2023 Daniel A. Morgan All Rights Reserved
  DBSecWorx