常見計算框架算子層對比

jopen 9年前發布 | 24K 次閱讀 算子層 分布式/云計算/大數據

背景


前段時間在為內部自研的計算框架設計算子層,參考對比了一些開源的計算框架的算子層,本文做一個粗粒度的梳理。

下面這張圖是我對計算框架抽象層次的一個拆分,具體可以參考上周日杭州Spark meetup上我做的Spark SQL分享 slides


Pig-latin


Hadoop MR上的DSL,面向過程,適用于large-scale的數據分析。語法很美,可惜只適合CLI 。

 
    A = load 'xx' AS (c1:int, c2:chararray, c3:float)  
    B = GROUP A BY c1  
    C = FOREACH B GENERATE group, COUNT(A)  
    C = FOREACH B GENERATE $0. $1.c2  

    X = COGROUP A by a1, B BY b1  
    Y = JOIN A by a1 (LEFT|FULL|LEFT OUTER), B BY b1  

Cascading


Hadoop MR上的封裝,推ter Summingbird正是基于Cascading的。 每個算子都是new出來的,Pipe實例被"迭代式"地傳入新的算子里 。

 
// define source and sink Taps.  
Scheme sourceScheme = new TextLine( new Fields( "line" ) );  
Tap source = new Hfs( sourceScheme, inputPath );  
Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );  
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );  

// the 'head' of the pipe assembly  
Pipe assembly = new Pipe( "wordcount" );  

// For each input Tuple  
// parse out each word into a new Tuple with the field name "word"  
// regular expressions are optional in Cascading  
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";  
Function function = new RegexGenerator( new Fields( "word" ), regex );  
assembly = new Each( assembly, new Fields( "line" ), function );  

// group the Tuple stream by the "word" value  
assembly = new GroupBy( assembly, new Fields( "word" ) );  

// For every Tuple group  
// count the number of occurrences of "word" and store result in  
// a field named "count"  
Aggregator count = new Count( new Fields( "count" ) );  
assembly = new Every( assembly, count );  

// initialize app properties, tell Hadoop which jar file to use  
Properties properties = new Properties();  
AppProps.setApplicationJarClass( properties, Main.class );  

// plan a new Flow from the assembly using the source and sink Taps  
// with the above properties  
FlowConnector flowConnector = new HadoopFlowConnector( properties );  
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );  

// execute the flow, block until complete  
flow.complete();

 

Trident


在Storm上提供高級的抽象原語,延續Transactional Topology的exactly-once的語義,滿足事務性。 原語過于抽象,構造過程充斥重復性的字段定義。

 
    TridentState urlToTweeters =  
           topology.newStaticState(getUrlToTweetersState());  
    TridentState tweetersToFollowers =  
           topology.newStaticState(getTweeterToFollowersState());  

    topology.newDRPCStream("reach")  
           .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))  
           .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))  
           .shuffle()  
           .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))  
           .parallelismHint(200)  
           .each(new Fields("followers"), new ExpandList(), new Fields("follower"))  
           .groupBy(new Fields("follower"))  
           .aggregate(new One(), new Fields("one"))  
           .parallelismHint(20)  
           .aggregate(new Count(), new Fields("reach"));   


RDD


Spark上的分布式彈性數據集,具備豐富的原語。 RDD原語的靈活性歸功于Scala語言本身的FP性質以及語法糖,而其豐富性又源自Scala語言本身API的豐富性。Java難以實現如此強大的表達能力。但RDD確實是非常有參考價值的。

 
    scala> val textFile = sc.textFile("README.md")  
    textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3  

    scala> textFile.count() // Number of items in this RDD  
    res0: Long = 126  

    scala> textFile.first() // First item in this RDD  
    res1: String = # Apache Spark  

    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))  
    linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09  

    scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?  
    res3: Long = 15  

    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)  
    res4: Long = 15  

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  
    wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8  

    scala> wordCounts.collect()  
    res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)   


SchemaRDD


Spark SQL里的"Table"型RDD,額外為SQL提供了一套DSL。 但是這套DSL只適合SQL,表達能力不夠,偏"垂直"。

 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
    // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.  
    import sqlContext.createSchemaRDD  

    // Define the schema using a case class.  
    case class Person(name: String, age: Int)  

    // Create an RDD of Person objects and register it as a table.  
    val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))  
    people.registerAsTable("people")  

    // SQL statements can be run by using the sql methods provided by sqlContext.  
    val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")  

    // DSL: where(), select(), as(), join(), limit(), groupBy(), orderBy() etc.  
    val teenagers = people.where('age >= 10).where('age <= 19).select('name)  
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)   

Apache Crunch


Google FlumeJava論文的開源實現,是一個標準的算子層,現在支持Hadoop任務和Spark任務。
Crunch 符合FlumeJava的設定,實現了PCollection和PTable這樣的分布式、不可變數據表示集,實現了 parallelDo(),groupByKey(),combineValues(),flattern()四種基本原語,且基于此原語可以衍生出 count(),join(),top()。也實現了Deffered Evalution 以及 針對MSCR(MapShuffleCombineReduce) Operation的優化。
Crunch的任務編寫嚴重依賴Hadoop,其本質是為了在批量計算框架上寫MapReduce Pipeline。原語方面不夠豐富,且parallelDo()不太適合流式語境。此外,其很多特性和功能是我們不需要具備的,但是抽象數據表示、接口模型、流程控制是可以參考的。

 
    public class WordCount extends Configured implements Tool, Serializable {  
      public int run(String[] args) throws Exception {  
        // Create an object to coordinate pipeline creation and execution.  
        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());  
        // Reference a given text file as a collection of Strings.  
        PCollection<String> lines = pipeline.readTextFile(args[0]);  

        PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {  
          public void process(String line, Emitter<String> emitter) {  
            for (String word : line.split("\\s+")) {  
              emitter.emit(word);  
            }  
          }  
        }, Writables.strings()); // Indicates the serialization format  

        PTable<String, Long> counts = words.count();  
        // Instruct the pipeline to write the resulting counts to a text file.  
        pipeline.writeTextFile(counts, args[1]);  
        // Execute the pipeline as a MapReduce.  
        PipelineResult result = pipeline.done();  

        return result.succeeded() ? 0 : 1;  
      }  

      public static void main(String[] args) throws Exception {  
        int result = ToolRunner.run(new Configuration(), new WordCount(), args);  
        System.exit(result);  
      }  
    }   


總結


最后這張圖展示了Hadoop之上各種Data Pipeline項目的實現層次對比: 



全文完 :)

來自:http://blog.csdn.net/pelick/article/details/39076223

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!