運用 Aggregator 模式實現 MapReduce

Stephen0304 8年前發布 | 14K 次閱讀 MapReduce Scala 算法

MapReduce是更好地利用并行計算資源來提升數據處理能力的重要算法,如今已被主流的大數據分析平臺實現,成為了大數據批量處理的主力軍。利用前面介紹的Actor特性,其實我們也可以實現一個簡易的MapReduce。

利用AKKA Actor來實現MapReduce,天生就支持并行計算(利用遠程Actor)與異步操作。為了簡便起見,本例使用了本地的Actor實現了大數據世界的Hello World,即WordCounter。

在編寫字數統計器的MapReduce之前,我們需要先分辨職責,包括:

  • 給定網頁地址,獲取指定網頁的內容
  • 對網頁內容進行分詞
  • 為每個單詞統計字數

考慮到本文的中心主題是介紹響應式編程與Actor模型,所以我們降低了案例難度,讀取的網頁內容均為英文,并簡單地以空格作為分詞的標志。由于我們需要接受客戶端的字數統計分析請求,那么要完成前面提到的職責,至少需要四個Actor:

  • WordCounterClient:發送數據分析請求
  • WordCounterServer:模擬服務端,接收數據分析請求,并最終將統計后的結果返回給WordCounterClient
  • PageContentFetcher:獲取網頁內容
  • ContentWordCounter:網頁內容的字數統計器

為了盡可能地提升性能,對于獲取網頁內容以及統計內容字數的統計工作,我們都需要多個Actor同時執行。然而,由于每個Actor處理消息都是以異步形式進行,我們 該怎樣才能知道并發處理的請求都得到了處理 ?針對字數統計器的案例而言,我們還需要將每個Actor統計獲得的字數再進行reduce,同樣也需要知道是否每條消息都已經處理完畢,并獲得處理的結果。

AKKA通過 Aggregator 特性實現了Aggregator模式,可以很好地解決剛才提到的問題。它通過引入一個單獨的聚合器Actor,用以聚合多個Actor產生的數據,并根據這些Actor對消息的Response 更新狀態

假定ContentWordCounter分析后的結果如下代碼所示:

case class AnalysisResult(wordToCount: Seq[(String, Long)])

那么,Aggregator就可以通過在其內部維持一個分析結果集(即前面所謂的狀態,代碼中的 analysisResults ),每收到一個Actor的Response,就將結果塞入到這個結果集(更新狀態)中,并判斷結果集的長度是否等于要處理的網頁數,以此作為消息是否處理完畢的條件。整個Aggregator的實現如下:

class WordCounterAggregator extends Actor with Aggregator {  expectOnce {
    case StartAggregation(target, urls) =>
      new Handler(target, urls, sender)
    case _ =>
      sender ! BadCommand
      context stop self
  }
  class Handler(target: ActorRef, urls: Seq[String], originalSender: ActorRef) {
    var analysisResults = Set.empty[AnalysisResult]
    context.system.scheduler.scheduleOnce(10.seconds, self, Timeout)
    expect {
      case Timeout =>
        respondIfDone(respondAnyway = true)
    }
    urls.foreach { uri =>
      target ! FetchPageContent(uri)
      expectOnce {
        case result: AnalysisResult =>
          analysisResults += result
          respondIfDone()
      }
    }
    def respondIfDone(respondAnyway: Boolean = false) = {
      import MapSeqImplicits._
      if (respondAnyway || analysisResults.size == urls.size) {
        val wordToCounts = analysisResults.flatMap(_.wordToCount).reduceByKey(_ + _)
        originalSender ! AggregatedAnalysisResult(wordToCounts)
        context stop self
      }
    }
  }
}

WordCounterAggregator繼承了 Aggregator 特性,這個特性已經對Actor的 receive 進行了處理,使得繼承該特性的Actor不需要重寫 receive 方法。 Aggregator 特性提供了 expect 、 expectOnce 與 unexpect ,用以接收期待處理的消息。

在Aggregator內部,其實維持了一個expectList,用以存放expect等函數所接收的偏函數。 expect 與 expectOnce 都是將偏函數放入到這個列表中,只是后者只留存一次(通過permanent標志來判定),一旦匹配了,就會將該偏函數移除,而 expect 則不會;至于 unexpect ,就是 expect 的反操作,用于將偏函數從列表中移除。

自定義的 respondIfDone 方法會在滿足聚合條件時,對分析結果進行reduce運算。Scala的集合庫自身并沒有提供 reduceByKey() 函數,是我模仿Spark的RDD自行編寫的隱式轉換方法:

object MapSeqImplicits {
  implicit class MapSeqWrapper(wordToCount: Iterable[(String, Long)]) {
    def reduceByKey(f: (Long, Long) => Long): Seq[(String, Long)] = {
      wordToCount.groupBy(_._1).map {
        case (word, counts) => (word, counts.map(_._2).foldLeft(0L)(f))      
      }.toSeq
    }
 }
}

因為引入了一個Aggregator,消息的處理以及Actor之間的協作就變得相對復雜。要進行響應式編程,其中一個關鍵就是要理清楚數據(或消息)的流動方向,并分辨每個數據處理器的職責。我們可以借助類似狀態圖之類的可視化工具幫助我們分析數據流動模型。下圖是本例的一個消息處理模型,它同時還表達了Actor之間的協作關系。

Actor之間的協作

執行字數統計的流程如下所示:

  • 首先,WordCounterClient接收StartAnalysisWebPages消息,準備分析網頁;
  • 由于Client沒有這個“能力”完成分析任務,于是求助于WordCounterServer,并發起FetchWebPages消息,要求獲取網頁內容;
  • WordCounterServer同樣是個憊懶貨色,什么都不干,轉手就將這件事情轉交給別的Actor了,所以他其實就是一個前臺接待員。如果不需要聚合,它收到的FetchWebPages其實應該交給PageContentFetcher,但現在須得經由WordCounterAggregator來分配請求;所以從另外一個角度來看,這個Aggregator相當于是一個Mediator;
  • 由于Aggregator是一個Mediator,因此它會協調多個PageContentFetcher與ContentWordCounter來并行完成任務;因而Aggregator和這兩個Actor之間是一對多關系,而PageContentFetcher與ContentWordCounter則屬于一對一關系。當PageContentFetcher獲得了網頁內容后,就通過CountPageContent消息,將統計字數的職責交給了ContentWordCounter;
  • ContentWordCounter在計算完當前網頁的字數后,會將分析結果AnalysisResult返回給Aggregator,并由其完成分析結果的reduce運算,并返回AggregatedAnalysisResult結果給Server;
  • 最后,Server再將Client需要的最終結果返回給Client。

由于Aggregator需要協調多個Fetcher與Counter的Actor,以支持異步并行計算(本例實則是并發計算)的需要,我為其引入了AKKA提供的Router Actor。通過Router可以創建一個容器Actor,內部管理多個worker rootees,并提供了RoundRobin、Random、Boardcast等多種路由形式,用戶可以根據Actor的負載情況選擇不同的路由方式。

這里,我選擇使用RoundRobin以硬編碼的形式創建了Router Actor:

val analyst: ActorRef = context.actorOf(Props(new ContentWordCounter(aggregator)), "PageContentAnalyst") 
val fetchers = context.actorOf(RoundRobinPool(4).props(Props(new PageContentFetcher(analyst))), "fetchers")

整體來看,PageContentFetcher與ContentWordCounter其實扮演的是map角色,并通過Router Actor來實現map工作的異步并發處理;而WordCounterAggregator則扮演了reduce角色,它負責將收到的多個分析結果進行reduce運算。

由于缺乏對MapReduce算法必要的封裝,用AKKA Actor實現的MapReduce顯得比較復雜,但卻較好地體現了響應式編程的異步數據流本質。

當我們在使用Actor來處理異步消息傳遞時,當業務漸趨復雜后,我們常常會迷失在復雜的消息傳遞網中而無法自拔。為了保持清醒的頭腦,需要時刻謹記Actor的職責。以我的經驗,我們應該考慮:

  • 從Actor扮演的角色來思考它應該接收什么樣的消息;
  • Actor對消息的處理一定要滿足單一職責原則,正確地履行職責,也當在合適時候正確地轉移職責;
  • 運用狀態圖幫助思考Actor與其他Actor之間的協作關系;
  • 正確理解AKKA Actor的消息發送機制,當在Actor內部再次發送消息時,是由sender發送,還是通過消息傳遞過來的actorRef對象發送消息。

要完成多個網頁的字數統計功能,除了使用稍顯復雜的Actor模式之外,我們也可以直接使用scala提供的并行集合來完成,代碼更為精簡:

val words = for {
 url <- urls.par
 line <- scala.io.Source.fromURL(url).getLines()
 word <- line.split(" ")
} yield (word)
val analysisResult = words.map(w => (w, 1L)).reduceByKey(_ + _)

在業務相對簡單,并不需要非阻塞消息處理,也沒有可伸縮性需求的時候,若能恰當運用scala自身提供的par集合會是好的選擇。

事實上,為了實現字數統計的功能,采用AKKA提供的Aggregator確乎有些過度。它更擅長于通過將職責分治與合理運用基于消息的Actor模式來完成更為復雜的響應式系統。WordCounter的例子不外乎是我為了更好地解釋Aggregator模式而給出的一個Demo罷了。

 

 

來自:http://www.jianshu.com/p/0ffef9fead84

 

 本文由用戶 Stephen0304 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!