MapReduce + Oracle = Tablefunctions
我們在OpenWorld大會做的其中一件事,是漂亮的展示了如何在通用的Oracle數據庫之上實現MapReduce系統。這里基于在這個博客上,顯示了很好的實施tablefunctions和映射器等.
但后來我們想,為什么不經過tablefunction代碼和MapReduce范例一種映射來告訴大家在Oracle中構建存在,并且如何利用Oracle創建一個數據處理/分析管道...所以這里是一些我們在OpenWorld大會中正在使用的代碼.
承上啟下,首先我們討論的標題,高亮有趣的片段與代碼注釋,然后我們討論的主體和實際(簡單)映射器,減速機代碼。該意見是很有希望使這個東西不言自明的...
Scenario
我們在這里做的相當簡單。創建一個簡單的表,表里有一些記錄和循環。減速器是做一個聚集。步驟如下:
CREATE TABLE sls (salesman VARCHAR2(30), quantity number)
/
INSERT INTO sls VALUES('Tom', 100);
INSERT INTO sls VALUES('Chu', 200);
INSERT INTO sls VALUES('Tom', 300);
INSERT INTO sls VALUES('Mike', 100);
INSERT INTO sls VALUES('Scott', 300);
INSERT INTO sls VALUES('Tom', 250);
INSERT INTO sls VALUES('Scott', 100);
commit;
/
Header
create or replace package oracle_map_reduce
is
-- The types we define here is similar to the input files
-- and output files that are used in MR code and are used to
-- store data while we run the actual package.
-- The big advantage is that we do not need to write to disk for
-- intermediate results.
type sales_t is table of sls%rowtype;
type sale_cur_t is ref cursor return sls%rowtype;
type sale_rec_t is record (name varchar2(30), total number);
type total_sales_t is table of sale_rec_t;
-- Next we define the funtions that do the work and make them known
-- to the outside world
-- Note that both mapper and reducer are tablefunctions!
-- Both mapper and reducer are pipelined and executable in parallel
-- the parallel degree is driven from the database side and is not
-- scheduled by the actual program
function mapper(inp_cur in sys_refcursor) return sales_t
pipelined parallel_enable (partition inp_cur by any);
-- the pipelined keyword tells the caller that this function acts as
-- a row source
--
-- parallel_enable indicates that this function can be executed in parallel
-- by the parallel query framework.
function reducer(in_cur in sale_cur_t) return total_sales_t
pipelined parallel_enable (partition in_cur by hash(salesman))
-- Finally we can cluster the results so that similar rows are chunked
-- together when used (note this does not drive distribution over the
-- parallel slaves, which is done by the partition clause shown in the mapper
-- and reducers)
cluster in_cur by (salesman);
end;
/
-- The body of the package has the mapper and the reducer code
-- The header as is shown here by itself defines the signature of
-- the package and declares types and variables to be used in the
-- package.
Body
create or replace package oracle_map_reduce
is
type sales_t is table of sls%rowtype;
type sale_cur_t is ref cursor return sls%rowtype;
type sale_rec_t is record (name varchar2(30), total number);
type total_sales_t is table of sale_rec_t;
function mapper(inp_cur in sys_refcursor) return sales_t
pipelined parallel_enable (partition inp_cur by any);
function reducer(in_cur in sale_cur_t) return total_sales_t
pipelined parallel_enable (partition in_cur by hash(salesman))
cluster in_cur by (salesman);
end;
/
-- The upper part is the header the following part if the body
-- Note the difference in the create statement below as compared
-- to the header
create or replace package body oracle_map_reduce
is
function mapper(inp_cur in sys_refcursor) return sales_t
pipelined parallel_enable (partition inp_cur by any)
is
sales_rec sls%ROWTYPE;
-- construct a record to hold an entire row from the SLS table
begin
-- First loop over all records in the table
loop
fetch inp_cur into sales_rec;
exit when inp_cur%notfound;
-- Place the found records from SLS into the variable
-- end the loop when there are no more rows to loop over
pipe row (sales_rec);
-- by using pipe row here we are giving back rows in streaming
-- fashion as you would expect from a table
-- this in combination with pipelined in the definition allows
-- the pipelining (e.g. giving data as it comes on board) of
-- a table function
end loop;
return;
-- Return is a mandatory piece that allows the consumer of data (our reducer
-- in this case)
-- to ensure all data has been sent. After return the rowsource is exhausted
-- and no more data comes from this function.
end mapper;
-- The above mapper does in effect nothing other than streaming data
-- partitioned
-- over to the next step. In MR the stream would be written to a file and then -- redistributed to the reducers
-- The reducer below computes and emits the sales figures
function reducer(in_cur in sale_cur_t) return total_sales_t
pipelined parallel_enable (partition in_cur by hash(salesman))
-- The partition by clause indicates that all instances of a particular
-- salesman must be sent to one instances of the reducer function
cluster in_cur by (salesman)
-- The cluster by clause tells the parallel query framework to cluster
-- all instances of a particular salesman together.
IS
sale_rec sls%ROWTYPE;
total_sale_rec sale_rec_t;
-- two containers are created, one as input the other as output
begin
total_sale_rec.total := 0;
total_sale_rec.name := NULL;
-- reset the values to initial values
loop
fetch in_cur into sale_rec;
exit when in_cur%notfound;
-- some if then logic to ensure we pipe a row once all is processed
if (total_sale_rec.name is null) then
-- The first instance is arriving, set the salesman value to that
-- input value
-- update 0 plus the incoming value for total
total_sale_rec.name := sale_rec.salesman;
total_sale_rec.total := total_sale_rec.total +
sale_rec.quantity;
elsif ( total_sale_rec.name <> sale_rec.salesman) then
-- We now switch sales man, and are done with the first
-- salesman (as rows are partitioned and clustered)
-- First pipe out the result of the previous salesman we
-- processed
-- then update the information to work on this new salesman
pipe row (total_sale_rec);
total_sale_rec.name := sale_rec.salesman;
total_sale_rec.total := sale_rec.quantity;
else
-- We get here when we work on the same salesman and just add
-- the totals, the move on to the next record
total_sale_rec.total := total_sale_rec.total +
sale_rec.quantity;
end if;
end loop;
-- The next piece of code ensures that any remaining rows that
-- have not been piped out
-- are piped out to the consumer. If there is a single salesman,
-- he is only piped out
-- in this piece of logic as we (in the above example code) only
-- pipe out upon a change
-- of salesman
if total_sale_rec.total<> 0 then
pipe row (total_sale_rec);
end if;
return;
-- Again, we are now done and have piped all rows to our consumer
end reducer;
end;
/
在一個SQL 查詢中使用它
花了一點時間,但是一旦你看到查詢,你就可以知道如何建立一系列預定義的程序,然后你可以實現
串在一起獲得一組結果集。
select *
from table(oracle_map_reduce.reducer(cursor(
select * from table(oracle_map_reduce.mapper(cursor(
select * from sls))) map_result)));
所有的邏輯管道數據都到下一個消費者,并且所有都是并行運行的。這使得它適合類似重型數據庫ETL(我們首先為了它發明的)的任何東西,并且任何需要應用大量邏輯到記錄的東西(像分析處理)。