Apache Crunch:簡化MapReduce編程的Java庫

jopen 11年前發布 | 18K 次閱讀 分布式/云計算/大數據 Apache Crunch

Apache Crunch:簡化MapReduce編程的Java庫
 Apache Crunch(孵化器項目)是基于Google的FlumeJava庫 編寫的Java庫,用于創建MapReduce流水線。與其他用來創建MapReduce作業的高層工具(如Apache Hive、Apache Pig和Cascading等)類似,Crunch提供了用于實現如連接數據、執行聚合和排序記錄等常見任務的模式庫。而與其他工具不同的 是,Crunch并不強制所有輸入遵循同一數據類型。相反,Crunch使用了一種定制的類型系統,非常靈活,能夠直接處理復雜數據類型,如時間序列、 HDF5文件、Apache HBase表和序列化對象(像protocol buffer或Avro記錄)等。

Crunch并不想阻止開發者以MapReduce方式思考,而是嘗試使之簡化。盡管MapReduce有諸多優點,但對很多問題而言,并非正確 的抽象級別:大部分有意思的計算都是由多個MapReduce作業組成的,情況往往是這樣——出于性能考慮,我們需要將邏輯上獨立的操作(如數據過濾、數 據投影和數據變換)組合為一個物理上的MapReduce作業。

</div>

本質上,Crunch設計為MapReduce之上的一個薄層,希望在不犧牲MapReduce力量(或者說不影響開發者使用MapReduce API)的前提下,更容易在正確的抽象級別解決手頭問題。

盡管Crunch會讓人想起歷史悠久的Cascading API,但是它們各自的數據模型有很大不同:按照常識簡單總結一下,可以認為把問題看做數據流的人會偏愛Crunch和Pig,而考慮SQL風格連接的人會偏愛Cascading和Hive。

Crunch的理念

PCollection和PTable是Crunch的核心抽象,前者代表一個分布式、不可變的對象集合,后者是Pcollection的一個子接口,其中包含了處理鍵值對的額外方法。這兩個核心類支持如下四個基本操作:

  1. parallelDo:將用戶定義函數應用于給定PCollection,返回一個新的PCollection作為結果。
  2. groupByKey:將一個PTable中的元素按照鍵值排序并分組(等同于MapReduce作業中的shuffle階段)
  3. combineValues:執行一個關聯操作來聚合來自groupByKey操作的值。
  4. union:將兩個或多個Pcollection看做一個虛擬的PCollection。
  5. </ol>

    Crunch的所有高階操作(joins、cogroups和set operations等)都是通過這些基本原語實現的。Crunch的作業計劃器(job planner)接收流水線開發者定義的操作圖,將操作分解為一系列相關的MapReduce作業,然后在Hadoop集群上執行。Crunch也支持內 存執行引擎,可用于本地數據上流水線的測試與調試。

    有些問題可以從能夠操作定制數據類型的大量用戶定義函數受益,而Crunch就是為這種問題設計的。Crunch中的用戶定義函數設計為輕量級的, 為滿足應用程序的需要,仍然提供了完整的訪問底層MapReduce API的功能。Crunch開發者也可以使用Crunch原語來定義API,為客戶提供涉及一系列復雜MapReduce作業的高級ETL、機器學習和科 學計算功能。

    Crunch起步

    可以從Crunch的網站下載最新版本的源代碼或二進制文件,或者使用在Maven Central發布的dependencies

    源代碼中有很多示例應用。下面是Crunch中WordCount應用的源代碼:

    import org.apache.crunch.DoFn;
    import org.apache.crunch.Emitter;
    import org.apache.crunch.PCollection;
    import org.apache.crunch.PTable;
    import org.apache.crunch.Pair;
    import org.apache.crunch.Pipeline;
    import org.apache.crunch.impl.mr.MRPipeline;
    import org.apache.crunch.type.writable.Writables;

    public class WordCount { public static void main(String[] args) throws Exception { // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class); // Reference a given text file as a collection of Strings. PCollection lines = pipeline.readTextFile(args[0]);

    // Define a function that splits each line in a PCollection of Strings into a
    // PCollection made up of the individual words in the file.
    PCollection words = lines.parallelDo(new DoFn() {
      public void process(String line, Emitter emitter) {
    for (String word : line.split("\\s+")) {
      emitter.emit(word);
    }
      }
    }, Writables.strings()); // Indicates the serialization format
    
    // The count method applies a series of Crunch primitives and returns
    // a map of the top 20 unique words in the input PCollection to their counts.
    // We then read the results of the MapReduce jobs that performed the
    // computations into the client and write them to stdout.
     for (Pair wordCount : words.count().top(20).materialize()) {
      System.out.println(wordCount);
     }
    

    } }</pre>

    Crunch優化方案

    Crunch優化器的目標是盡可能減少運行的MapReduce作業數。大多數MapReduce作業都是 IO密集型的,因此訪問數據的次數越少越好。公平地說,每種優化器(Hive、Pig、Cascading和Crunch)的工作方式本質上是相同的。但 與其他框架不同的是,Crunch把優化器原語暴露給了客戶開發人員,對于像構造ETL流水線或構建并評估一組隨機森林模型這樣的任務而言,構造可復用的 高階操作更容易。

    結論

    Crunch目前仍處于Apache的孵化器階段,我們非常歡迎社區貢獻(參見項目主頁)讓這個庫更好。特別的是,我們正在尋求更高效的MapReduce編譯思想(包括基于成本考慮的優化)、新的MapReduce設計模式,還希望支持更多的數據源和目標,如HCatalog、Solr和ElasticSearch等。還有很多把Crunch帶向如ScalaClojure等其他JVM語言的項目,也有很多使用Crunch以R語言來創建MapReduce流水線的工具。

    查看英文原文:Apache Crunch: A Java Library for Easier MapReduce Programming

    來源:InfoQ

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!