在 Oracle 數據庫中實現 MapReduce

jopen 10年前發布 | 23K 次閱讀 MapReduce Oracle 數據庫服務器

在程序員開發并行程序時,Map-Reduce模式正變得流行起來。這些map-reduce程序通常來并行處理大量數據。本文來演示如何在Oracle數據庫上,通過使用Parallel Pipelined Table函數及并行操作,來實現Map-Reduce程序。(譯者注:table()是oracle中一個函數,可以把定義為Pipelined的function的返回結果進行SQL查詢)

原理:

Pipelined Table函數是在Oracle 9i引入的,作為能在數據流中嵌入過程邏輯代碼方法。從邏輯上說,一個Table函數是可以出現在from子句中,該函數就像數據表一樣的返回多行數據。Table函數同樣也可以接收多行數據做為輸入參數。大多數情況下,Pipelined Table函數可以嵌入到一個數據流中,它讓數據“流”進SQL語句中,從而避免增加一個物理層(直譯:具體化的中介)。再次說明,Pipelined Table函數是可以并行處理的。

為了并行Table函數,開發人員必須指定指定一個鍵對輸入數據進行重定位。Table函數可以直接在PL/SQL, Java, and 中實現,你可以查到關于Table函數的更多信息、例子以及上面提到的那些功能,網址是:http://download.oracle.com/docs/cd/B10501_01/appdev.920/a96624/08_subs.htm#19677

在多個發行版中,Pipelined Table函數已經被用戶使用,并成為Oracle可擴展基礎功能的一個核心部分。無論是外部用戶,還是Oracle的開發部門,Table函數成為一個有效的、簡單的擴充數據庫核心功能的方法。

類似Table函數的功能已經在Oracle內使用,并且是Oracle Spatial 和Oracle Warehouse Builder許多特色功能的實現方式。Oracle Spatial(空間數據處理系統)使用它涉及spatial joins 和許多 spatial data的數據挖掘的操作。Oracle Warehouse Builder讓讓用戶使用Table 函數對數據流進行并行處理的邏輯,比如Match-Merge 算法和其它逐行計算的算法。

手把手的例子

所有的例子都在omr.sql文件中。

為了說明并行的使用方法以及用Pipelined Table函數在Oracle數據庫內寫一個Map-Reduce算法, 我們實現一個最經典的map-reduce例子--單詞計數。單詞計數是實現返回一組文檔中所有不重復單詞出現的個數的程序,也可以說是查詢單詞出現頻率功能。

示例代碼是用PL/SQL實現,但如前所說,Oracle允許你選擇其它語言來實現這個過程邏輯。

1、配置環境

我們將在一組文檔中查找,這些文檔可以是數據庫之外的文件中,也可以保存在Secure Files/CLOB的數據庫內的列中。在我們這個存文檔的表也相當于一個文件系統。

在本例中,我們將在數據庫內創建一個表,用下面的聲明:

CREATE TABLE documents (a CLOB)      
  LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);

</blockquote>

該表的每一行都對應一個文檔,我們在用下面的語句,這個表中插入三個簡單的文檔:

INSERT INTO documents VALUES ('abc def');      
INSERT INTO documents VALUES ('def ghi');        
INSERT INTO documents VALUES ('ghi jkl');        
commit;

</blockquote>

map代碼和reduce代碼都將包含在一個包中,保持代碼的整潔。為了展示這些步驟,我將把這些代碼段從包中拿出來,在下面各小節展示。在實際的包中,還必須要定義幾個types。所有代碼均在Oracle Database 11g (11.1.0.6)測試通過。

2、創建Mapper and the Reducer

首先我們要創建一個普通的map函數來給文檔做標記。記住,我們不是要展示這個map函數有多么好,而是要表達這在數據庫工作的原理。這個map函數非常基本,其它地方也可能有更好的實現。

你可以使用數據庫的聚合引擎及僅map函數來得到最終結果。一個請求和結果看起來是: SQL完成聚合操作,不需要reducer的函數。

當然,你也可以寫自己的聚合的Table函數來計算單詞的出現次數。如果你不用oracle的聚合引擎的話,你必須自己來寫map-reduce的程序。這個聚合Table函數就相當于map-reduce中的reducer部分。

Table函數要求輸入必須按單詞分組,需要將數據排序(用oracle 執行引擎的sort)或單詞分簇。我們展示一個簡單的記數程序在本文中。

第3步 ,數據庫中進行map-reduce

當你寫完mapper and the reducer后,你就可以在數據庫中進行map-reduce.執行一個包含Table函數的請求,就能對外部文檔進行并行的按照map-reduce的代碼執行。

總結

Oracle Table函數是經得起驗證的技術,并在Oracle的內外廣泛使用的擴展Oracle11g的技術。

Oracle Table函數是穩定并可擴展的方法,在Oracle數據庫內實現Map-Reduce,并且能夠利用Oracle并行執行框架的擴展性。在SQL中利用它,能讓數據庫開發人員用自己熟悉的環境和語言,為他們提供一個有效的、簡單的機制去實現Map-Reduce方法。

你可以下載orm.sql,沒有什么特殊的權限需求。

附:orm.sql代碼

CREATE TABLE documents (a CLOB)
  LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);

INSERT INTO documents VALUES ('abc def'); INSERT INTO documents VALUES ('def ghi'); INSERT INTO documents VALUES ('ghi jkl'); commit;

create or replace package oracle_map_reduce is

  type word_t     is record (word varchar2(4000));   type words_t    is table of word_t;

  type word_cur_t is ref cursor return word_t;   type wordcnt_t  is record (word varchar2(4000), count number);   type wordcnts_t is table of wordcnt_t;

  function mapper(doc in sys_refcursor, sep in varchar2) return words_t     pipelined parallel_enable (partition doc by any);

  function reducer(in_cur in word_cur_t) return wordcnts_t     pipelined parallel_enable (partition in_cur by hash(word))     cluster in_cur by (word);

end; /

create or replace package body oracle_map_reduce is

  --   -- The mapper is a simple tokenizer that tokenizes the input documents   -- and emits individual words   --   function mapper(doc in sys_refcursor, sep in varchar2) return words_t     pipelined parallel_enable (partition doc by any)   is     document clob;     istart   number;     pos      number;     len      number;     word_rec word_t;   begin

    -- for every document     loop

      fetch doc into document;       exit when doc%notfound;

      istart := 1;       len := length(document);

      -- For every word within a document       while (istart <= len) loop         pos := instr(document, sep, istart);

        if (pos = 0) then           word_rec.word := substr(document, istart);           pipe row (word_rec);           istart := len + 1;         else           word_rec.word := substr(document, istart, pos - istart);           pipe row (word_rec);           istart := pos + 1;         end if;

      end loop; -- end loop for a single document

    end loop; -- end loop for all documents

    return;

  end mapper;

  --   -- The reducer emits words and the number of times they're seen   --   function reducer(in_cur in word_cur_t) return wordcnts_t     pipelined parallel_enable (partition in_cur by hash(word))     cluster in_cur by (word)   is     word_count wordcnt_t;     next       varchar2(4000);   begin

    word_count.count := 0;

    loop

      fetch in_cur into next;       exit when in_cur%notfound;

      if (word_count.word is null) then

        word_count.word := next;         word_count.count := word_count.count + 1;

      elsif (next <> word_count.word) then

        pipe row (word_count);         word_count.word := next;         word_count.count := 1;

      else

        word_count.count := word_count.count + 1;

      end if;

    end loop;

    if word_count.count <> 0 then       pipe row (word_count);     end if;

    return;

  end reducer;

end; /

-- Select statements

select word, count(*)  from (         select value(map_result).word word         from table(oracle_map_reduce.mapper(cursor(select a from documents), ' ')) map_result) group by (word);

select *  from table(oracle_map_reduce.reducer(               cursor(select value(map_result).word word                        from table(oracle_map_reduce.mapper(                         cursor(select a from documents), ' ')) map_result)));</pre>

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!