在 Oracle 數據庫中實現 MapReduce
在程序員開發并行程序時,Map-Reduce模式正變得流行起來。這些map-reduce程序通常來并行處理大量數據。本文來演示如何在Oracle數據庫上,通過使用Parallel Pipelined Table函數及并行操作,來實現Map-Reduce程序。(譯者注:table()是oracle中一個函數,可以把定義為Pipelined的function的返回結果進行SQL查詢)
原理:
Pipelined Table函數是在Oracle 9i引入的,作為能在數據流中嵌入過程邏輯代碼方法。從邏輯上說,一個Table函數是可以出現在from子句中,該函數就像數據表一樣的返回多行數據。Table函數同樣也可以接收多行數據做為輸入參數。大多數情況下,Pipelined Table函數可以嵌入到一個數據流中,它讓數據“流”進SQL語句中,從而避免增加一個物理層(直譯:具體化的中介)。再次說明,Pipelined Table函數是可以并行處理的。
為了并行Table函數,開發人員必須指定指定一個鍵對輸入數據進行重定位。Table函數可以直接在PL/SQL, Java, and 中實現,你可以查到關于Table函數的更多信息、例子以及上面提到的那些功能,網址是:http://download.oracle.com/docs/cd/B10501_01/appdev.920/a96624/08_subs.htm#19677
在多個發行版中,Pipelined Table函數已經被用戶使用,并成為Oracle可擴展基礎功能的一個核心部分。無論是外部用戶,還是Oracle的開發部門,Table函數成為一個有效的、簡單的擴充數據庫核心功能的方法。
類似Table函數的功能已經在Oracle內使用,并且是Oracle Spatial 和Oracle Warehouse Builder許多特色功能的實現方式。Oracle Spatial(空間數據處理系統)使用它涉及spatial joins 和許多 spatial data的數據挖掘的操作。Oracle Warehouse Builder讓讓用戶使用Table 函數對數據流進行并行處理的邏輯,比如Match-Merge 算法和其它逐行計算的算法。
手把手的例子
所有的例子都在omr.sql文件中。
為了說明并行的使用方法以及用Pipelined Table函數在Oracle數據庫內寫一個Map-Reduce算法, 我們實現一個最經典的map-reduce例子--單詞計數。單詞計數是實現返回一組文檔中所有不重復單詞出現的個數的程序,也可以說是查詢單詞出現頻率功能。
示例代碼是用PL/SQL實現,但如前所說,Oracle允許你選擇其它語言來實現這個過程邏輯。
1、配置環境
我們將在一組文檔中查找,這些文檔可以是數據庫之外的文件中,也可以保存在Secure Files/CLOB的數據庫內的列中。在我們這個存文檔的表也相當于一個文件系統。
在本例中,我們將在數據庫內創建一個表,用下面的聲明:
CREATE TABLE documents (a CLOB)
</blockquote>
LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);該表的每一行都對應一個文檔,我們在用下面的語句,這個表中插入三個簡單的文檔:
INSERT INTO documents VALUES ('abc def');
</blockquote>
INSERT INTO documents VALUES ('def ghi');
INSERT INTO documents VALUES ('ghi jkl');
commit;map代碼和reduce代碼都將包含在一個包中,保持代碼的整潔。為了展示這些步驟,我將把這些代碼段從包中拿出來,在下面各小節展示。在實際的包中,還必須要定義幾個types。所有代碼均在Oracle Database 11g (11.1.0.6)測試通過。
2、創建Mapper and the Reducer
首先我們要創建一個普通的map函數來給文檔做標記。記住,我們不是要展示這個map函數有多么好,而是要表達這在數據庫工作的原理。這個map函數非常基本,其它地方也可能有更好的實現。
你可以使用數據庫的聚合引擎及僅map函數來得到最終結果。一個請求和結果看起來是: SQL完成聚合操作,不需要reducer的函數。
當然,你也可以寫自己的聚合的Table函數來計算單詞的出現次數。如果你不用oracle的聚合引擎的話,你必須自己來寫map-reduce的程序。這個聚合Table函數就相當于map-reduce中的reducer部分。
Table函數要求輸入必須按單詞分組,需要將數據排序(用oracle 執行引擎的sort)或單詞分簇。我們展示一個簡單的記數程序在本文中。
第3步 ,數據庫中進行map-reduce
當你寫完mapper and the reducer后,你就可以在數據庫中進行map-reduce.執行一個包含Table函數的請求,就能對外部文檔進行并行的按照map-reduce的代碼執行。
總結
Oracle Table函數是經得起驗證的技術,并在Oracle的內外廣泛使用的擴展Oracle11g的技術。
Oracle Table函數是穩定并可擴展的方法,在Oracle數據庫內實現Map-Reduce,并且能夠利用Oracle并行執行框架的擴展性。在SQL中利用它,能讓數據庫開發人員用自己熟悉的環境和語言,為他們提供一個有效的、簡單的機制去實現Map-Reduce方法。
你可以下載orm.sql,沒有什么特殊的權限需求。
附:orm.sql代碼
CREATE TABLE documents (a CLOB) LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);INSERT INTO documents VALUES ('abc def'); INSERT INTO documents VALUES ('def ghi'); INSERT INTO documents VALUES ('ghi jkl'); commit;
create or replace package oracle_map_reduce is
type word_t is record (word varchar2(4000)); type words_t is table of word_t;
type word_cur_t is ref cursor return word_t; type wordcnt_t is record (word varchar2(4000), count number); type wordcnts_t is table of wordcnt_t;
function mapper(doc in sys_refcursor, sep in varchar2) return words_t pipelined parallel_enable (partition doc by any);
function reducer(in_cur in word_cur_t) return wordcnts_t pipelined parallel_enable (partition in_cur by hash(word)) cluster in_cur by (word);
end; /
create or replace package body oracle_map_reduce is
-- -- The mapper is a simple tokenizer that tokenizes the input documents -- and emits individual words -- function mapper(doc in sys_refcursor, sep in varchar2) return words_t pipelined parallel_enable (partition doc by any) is document clob; istart number; pos number; len number; word_rec word_t; begin
-- for every document loop
fetch doc into document; exit when doc%notfound;
istart := 1; len := length(document);
-- For every word within a document while (istart <= len) loop pos := instr(document, sep, istart);
if (pos = 0) then word_rec.word := substr(document, istart); pipe row (word_rec); istart := len + 1; else word_rec.word := substr(document, istart, pos - istart); pipe row (word_rec); istart := pos + 1; end if;
end loop; -- end loop for a single document
end loop; -- end loop for all documents
return;
end mapper;
-- -- The reducer emits words and the number of times they're seen -- function reducer(in_cur in word_cur_t) return wordcnts_t pipelined parallel_enable (partition in_cur by hash(word)) cluster in_cur by (word) is word_count wordcnt_t; next varchar2(4000); begin
word_count.count := 0;
loop
fetch in_cur into next; exit when in_cur%notfound;
if (word_count.word is null) then
word_count.word := next; word_count.count := word_count.count + 1;
elsif (next <> word_count.word) then
pipe row (word_count); word_count.word := next; word_count.count := 1;
else
word_count.count := word_count.count + 1;
end if;
end loop;
if word_count.count <> 0 then pipe row (word_count); end if;
return;
end reducer;
end; /
-- Select statements
select word, count(*) from ( select value(map_result).word word from table(oracle_map_reduce.mapper(cursor(select a from documents), ' ')) map_result) group by (word);
select * from table(oracle_map_reduce.reducer( cursor(select value(map_result).word word from table(oracle_map_reduce.mapper( cursor(select a from documents), ' ')) map_result)));</pre>
本文地址:http://www.oschina.net/translate/in-database_map-reduce
原文地址:https://blogs.oracle.com/datawarehousing/entry/in-database_map-reduce
</div>本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!