讓并發和容錯更容易:Akka示例教程(譯文)
BY DIAGO CASTORINA
翻譯:自由的檸檬
原文鏈接:http://www.toptal.com/scala/concurrency-and-fault-tolerance-made-easy-an-intro-to-akka
挑戰
寫并發程序很難。程序員不得不處理線程、鎖和競態條件等等,這個過程很容易出錯,而且會導致程序代碼難以閱讀、測試和維護。
所以,很多人不傾向于使用多線程編程。取而代之的是,他們使用單線程進程(譯者注:只含有一個線程的進程),依賴外部服務(如數據庫、隊列等)處理所需的并發或異步操作。雖然這種方法在有些情況下是可行的,但還有很多其他情況不能奏效。很多實時系統——例如交易或銀行業務應用,或實時游戲——等待一個單線程進程完成就太奢侈了(他們需要立即應答!)。其他的一些對于計算或資源要求非常高的系統,如果在程序中不引入并行機制就會耗時很久(有些情況下可以達到幾個小時或數天)。
常用的一種單線程方法(例如,在Node.js里廣泛應用)是使用基于事件的、非阻塞模式(event-based, non-blocking paradigm,其中paragidigm也有譯作成例)。雖然這種方法可以避免上下文切換、鎖和阻塞,的確能提高性能,但還是不能解決并發使用多個處理器(需要啟動和協調多個獨立的處理器)的問題。
那么,這是不是意味著為了構建一個并發程序,除了深入到線程、鎖和競態條件之外沒有別的選擇呢?
感謝Akka框架,它為我們提供了一種選擇。本教程介紹了Akka的示例,并仔細研究它如何幫助并簡化分布式并發應用的實現。
Akka框架是什么
這篇文章介紹了Akka并仔細研究它如何幫助并簡化分布式并發應用的實現。
Akka是JVM(Java虛擬機,下同)平臺上構建高并發、分布式和容錯應用的工具包和運行時。Akka用Scala語言寫成,同時提供了Scala和Java的開發接口。
Akka處理并發的方法基于Actor(沒有慣用譯法,文中使用原詞)模型。在基于Actor的系統里,所有的事物都是actor,就好像在面向對象設計里面所有的事物都是對象一樣。但是有一個重要區別——特別是和我們的討論相關的——那就是Actor模型是作為一個并發模型設計和架構的,而面向對象模式則不是。更具體一點,在Scala的actor系統里,actor互相交互并共享信息但并不對交互順序作出預設。Actor之間共享信息和發起任務的機制是消息傳遞。
創建和調度線程、接收和分發消息以及處理競態條件和同步的所有復雜性,都委托給框架,框架的處理對應用來說是透明的。
Akka在多個actor和下面的系統之間建立了一個層次(layer),這樣一來,actor只需要處理消息就可以了。創建和調度線程、接收和分發消息以及處理競態條件和同步的所有復雜性,都委托給框架,框架的處理對應用來說是透明的。
Actor嚴格遵守響應式聲明。響應式應用的目標是通過滿足以下一個或多個條件來代替傳統的多線程應用:
- 事件驅動。使用Actor,代碼可以異步處理請求并用獨占的方式執行非阻塞操作。
- 可伸縮性。在Akka里,不修改代碼就增加節點是可能的,感謝消息傳遞和本地透明性(location transparency)。
- 高彈性。任何應用都會碰到錯誤并在某個時間點失敗。Akka的“監管”(容錯)策略為實現自愈系統提供了便利。
- 響應式。今天的高性能和快速響應應用需要對用戶快速反饋,因此對于事件的響應需要非常及時。Akka的非阻塞、基于消息的策略可以幫助達成這個目標。
Akka中的Actor是什么
Actor本質上就是接收消息并采取行動處理消息的對象。它從消息源中解耦出來,只負責正確識別接收到的消息類型,并采取相應的行動。
收到一條消息之后,一個actor可能會采取以下一個或多個行動:
- 執行一些本身的操作(例如進行計算、持久化數據、調用外部的Web服務等)
- 把消息或衍生消息轉發給另外一個actor
- 實例化一個新的actor并把消息轉發給它
或者,如果這個actor認為合適的話,可能會完全忽略這條消息(也就是說,它可能選擇不響應)。
為了實現一個actor,需要繼承akka.actor.Actor這個trait(一般譯為“特征”,譯法有一定爭議,文中保留原詞)并實現receive方法。當一個消息發送給Actor時,它的receive方法會被(Akka)調用。典型的實現包括使用模式匹配(pattern matching)來識別消息類型并作出響應,參見下面的Akka示例:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
def receive = {
case value: String => doSomething(value)
case _ => println("received unknown message")
}
}
模式匹配是一種相對優雅的處理消息的技術,相比基于回調的實現,更傾向于產生“更整潔”以及更容易瀏覽的代碼。例如,考慮一個簡化版的HTTP請求/響應實現。
首先,我們使用JavaScript中基于回調的方式實現:
route(url, function(request){
var query = buildQuery(request);
dbCall(query, function(dbResponse){
var wsRequest = buildWebServiceRequest(dbResponse);
wsCall(wsRequest, function(wsResponse) {
sendReply(wsResponse);
});
});
});
現在,我們把它和基于模式匹配的實現做個比較:
msg match {
case HttpRequest(request) => {
val query = buildQuery(request)
dbCall(query)
}
case DbResponse(dbResponse) => {
var wsRequest = buildWebServiceRequest(dbResponse);
wsCall(dbResponse)
}
case WsResponse(wsResponse) => sendReply(wsResponse)
}
雖然基于回調的JavaScript代碼更緊湊,但確實更難以閱讀和瀏覽。相比而言,基于模式匹配的代碼對于需要考慮哪些情況、每種情況都是怎么處理的寫法更加清晰。
Actor系統
把一個復雜的問題不斷分解成更小規模的子問題通常是一種可靠的解決問題的技術。這個方法對于計算機科學特別有效(和單一職責原則一致),因為這樣容易產生整潔的、模塊化的代碼,產生的冗余很少甚至沒有,而且維護起來相對容易。
在基于actor的設計里,使用這種技術有助于把actor的邏輯組織變成一個層級結構,也就是所謂的Actor系統。Actor系統提供了一個基礎框架,通過這個系統actor之間可以進行交互。
在Akka里面,和actor通信的唯一方式就是通過ActorRef
。ActorRef
代表actor的一個引用,可以阻止其他對象直接訪問或操作這個actor的內部信息和狀態。消息可以通過一個ActorRef
以下面的語法協議中的一種發送到一個actor:
-!
(“告知”) —— 發送消息并立即返回
-?
(“請求”) —— 發送消息并返回一個Future對象,代表一個可能的應答
每個actor都有一個收件箱,用來接收發送過來的消息。收件箱有多種實現方式可以選擇,缺省的實現是先進先出(FIFO)隊列。
在處理多條消息時,一個actor包含多個實例變量來保持狀態。Akka確保actor的每個實例都運行在自己的輕量級線程里,并保證每次只處理一條消息。這樣一來,開發者不必擔心同步或競態條件,而每個actor的狀態都可以被可靠地保持。
Akka的Actor API中提供了每個actor執行任務所需要的有用信息:
sender
:當前處理消息的發送者的一個ActorRef
引用context
:actor運行上下文相關的信息和方法(例如,包括實例化一個新actor的方法actorOf
)supervisionStrategy
:定義用來從錯誤中恢復的策略self
:actor本身的ActorRef
引用
Akka確保actor的每個實例都運行在自己的輕量級線程里,并保證每次只處理一條消息。這樣一來,開發者不必擔心同步或競態條件,而每個actor的狀態都可以被可靠地保持。
為了把這些教程組織起來,讓我們來考慮一個簡單的例子:統計一個文本文件中單詞的數量。
為了達到演示Akka示例的目的,我們把這個問題分解為兩個子任務;即,(1)統計每行單詞數量的“孩子”任務和(2)匯總這些單行單詞數量、得到文件里單詞總數的“父親”任務。
父actor會從文件中裝載每一行,然后委托一個子actor來計算某一行的單詞數量。當子actor完成之后,它會把結果用消息發回給父actor。父actor會收到(每一行的)單詞數量的消息并維持一個整個文件單詞總數的計數器,這個計數器會在完成后返回給調用者。
(注意以下提供的Akka教程的例子只是為了教學目的,所以沒有顧及所有的邊界條件、性能優化等。同時,完整可編譯版本的代碼示例可以在這個gist中找到)
讓我們首先看一個子類StringCounterActor
的示例實現:
case class ProcessStringMsg(string: String)
case class StringProcessedMsg(words: Integer)
class StringCounterActor extends Actor {
def receive = {
case ProcessStringMsg(string) => {
val wordsInLine = string.split(" ").length
sender ! StringProcessedMsg(wordsInLine)
}
case _ => println("Error: message not recognized")
}
}
這個actor有一個非常簡單的任務:接收ProcessStringMsg
消息(包含一行文本),計算這行文本中單詞的數量,并把結果通過一個StringProcessedMsg
消息返回給發送者。請注意我們已經實現了我們的類,使用!
(“告知”)方法發出StringProcessedMsg
消息(發出消息并立即返回)。
好了,現在我們來關注父WordCounterActor
類:
case class StartProcessFileMsg()
2.
3. class WordCounterActor(filename: String) extends Actor {
4.
5. private var running = false
6. private var totalLines = 0
7. private var linesProcessed = 0
8. private var result = 0
9. private var fileSender: Option[ActorRef] = None
10.
11. def receive = {
12. case StartProcessFileMsg() => {
13. if (running) {
14. // println just used for example purposes;
15. // Akka logger should be used instead
16. println("Warning: duplicate start message received")
17. } else {
18. running = true
19. fileSender = Some(sender) // save reference to process invoker
20. import scala.io.Source._
21. fromFile(filename).getLines.foreach { line =>
22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)
23. totalLines += 1
24. }
25. }
26. }
27. case StringProcessedMsg(words) => {
28. result += words
29. linesProcessed += 1
30. if (linesProcessed == totalLines) {
31. fileSender.map(_ ! result) // provide result to process invoker
32. }
33. }
34. case _ => println("message not recognized!")
35. }
36. }
這里面有很多細節,我們來逐一考察(注意討論中所引用的行號基于以上代碼示例)。
首先,請注意要處理的文件名被傳給了WordCounterActor
的構造方法(第3行)。這意味著這個actor只會用來處理一個單獨的文件。這樣通過避免重置狀態變量(running
,totalLines
,linesProcessed
和result
)也簡化了開發者的編碼工作,因為這個實例只使用一次(也就是說處理一個單獨的文件),然后就丟棄了。
接下來,我們看到WordCounterActor
處理了兩種類型的消息:
StartProcessFileMsg
(第12行)- 從最初啟動
WordCounterActor
的外部actor接收到的消息 - 收到這個消息之后,
WordCounterActor
首先檢查它收到的是不是一個重復的請求 - 如果這個請求是重復的,那么
WordCounterActor
生成一個警告,然后就不做別的事了(第16行) - 如果這不是一個重復的請求:
WordCounterActor
在fileSender
實例變量(注意這是一個Option[ActorRef]
而不是一個Option[Actor]
)中保存發送者的一個引用。當處理最終的StringProcessedMsg
(從一個StringCounterActor
子類中接收,如下文所述)時,為了以后的訪問和響應,這個ActorRef
是必需的。- 然后
WordCounterActor
讀取文件,當文件中每行都裝載之后,就會創建一個StringCounterActor
,需要處理的包含行文本的消息就會傳遞給它(第21-24行)。
- 從最初啟動
StringProcessedMsg
(第27行)- 當處理完成分配給它的行之后,從
StringCounterActor
處接收到的消息 - 收到此消息之后,
WordCounterActor
會把文件的行計數器增加,如果所有的行都處理完畢(也就是說,當totalLines
和linesProcessed
相等),它會把最終結果發給原來的fileSender
(第28-31行)。
- 當處理完成分配給它的行之后,從
再次需要注意的是,在Akka里,actor之間通信的唯一機制就是消息傳遞。消息是actor之間唯一共享的東西,而且因為多個actor可能會并發訪問同樣的消息,所以為了避免競態條件和不可預期的行為,消息的不可變性非常重要。
因為Case class默認是不可變的并且可以和模式匹配無縫集成,所以用case class的形式來傳遞消息是很常見的。(Scala中的Case class就是正常的類,唯一不同的是通過模式匹配提供了可以遞歸分解的機制)。
讓我們通過運行整個應用的示例代碼來結束這個例子。
object Sample extends App {
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.dispatch.ExecutionContexts._
implicit val ec = global
override def main(args: Array[String]) {
val system = ActorSystem("System")
val actor = system.actorOf(Props(new WordCounterActor(args(0))))
implicit val timeout = Timeout(25 seconds)
val future = actor ? StartProcessFileMsg()
future.map { result =>
println("Total number of words " + result)
system.shutdown
}
}
}
請注意這里的?
方法是怎樣發送一條消息的。用這種方法,調用者可以使用返回的Future對象,當完成之后可以打印出最后結果并最終通過停掉Actor系統退出程序。
Akka的容錯和監管者策略
在actor系統里,每個actor都是其子孫的監管者。如果actor處理消息時失敗,它就會暫停自己及其子孫并發送一個消息給它的監管者,通常是以異常的形式。
在Akka里面,監管者策略是定義你的系統容錯行為的主要并且直接的機制。
在Akka里面,一個監管者對于從子孫傳遞上來的異常的響應和處理方式稱作監管者策略。監管者策略是定義你的系統容錯行為的主要并且直接的機制。
當一條消息指示有一個錯誤到達了一個監管者,它會采取如下行動之一:
- 恢復孩子(及其子孫),保持內部狀態。 當孩子的狀態沒有被錯誤破壞,還可以繼續正常工作的時候,可以使用這種策略。
- 重啟孩子(及其子孫),清除內部狀態。 這種策略應用的場景和第一種正好相反。如果孩子的狀態已經被錯誤破壞,在它可以被用到Future之前有必須要重置其內部狀態。
- 永久地停掉孩子(及其子孫)。 這種策略可以用在下面的場景中:錯誤條件不能被修正,但是并不影響后面執行的操作,這些操作可以在失敗的孩子不存在的情況下完成。
- 停掉自己并向上傳播錯誤。 適用場景:當監管者不知道如何處理錯誤,就把錯誤傳遞給自己的監管者。
而且,一個Actor可以決定是否把行動應用在失敗的子孫上抑或是應用到它的兄弟上。有兩種預定義的策略:
OneForOneStrategy
:只把指定行動應用到失敗的孩子上AllForOneStrategy
:把指定行動應用到所有子孫上
下面是一個使用OneForOneStrategy
的簡單例子:
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
override val supervisorStrategy =
OneForOneStrategy() {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
如果沒有指定策略,那么就使用如下默認的策略:
- 如果在初始化actor時出錯,或者actor被結束(killed),那么actor就會停止(stopped)
- 如果有任何類型的異常出現,actor就會重啟
Akka提供的默認策略的實現如下:
final val defaultStrategy: SupervisorStrategy = {
def defaultDecider: Decider = {
case _: ActorInitializationException ? Stop
case _: ActorKilledException ? Stop
case _: Exception ? Restart
}
OneForOneStrategy()(defaultDecider)
}
Akka也考慮到對定制化監管者策略的實現,但正如Akka文檔也提出了警告,這么做要小心,因為錯誤的實現會產生諸如actor系統被阻塞的問題(也就是說,其中的多個actor被永久掛起了)。
本地透明性
Akka架構支持本地透明性,使得actor完全不知道他們接受的消息是從哪里發出來的。消息的發送者可能駐留在同一個JVM,也有可能是存在于其他的JVM(或者運行在同一個節點,或者運行在不同的節點)。Akka處理這些情況對于actor(也即對于開發者)來說是完全透明的。唯一需要說明的是跨越節點的消息必須要被序列化。
Akka架構支持本地透明性,使得actor完全不知道他們接受的消息是從哪里發出來的。
Actor系統設計的初衷,就是不需要任何專門的代碼就可以運行在分布式環境中。Akka只需要一個配置文件(application.conf),用以說明發送消息到哪些節點。下面是配置文件的一個例子:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
port = 2552
}
}
}
最后的一些提示
我們已經了解了Akka框架幫助完成并發和高性能的方法。然而,正如這篇教程指出的,為了充分發揮Akka的能力,在設計和實現系統時,有些要點值得考慮:
- 我們應盡最大可能為每個actor都分配最小的任務(如上面討論的,遵守單一職責原則)
- Actor應該異步處理事件(也就是處理消息),不應該阻塞,否則就會發生上下文切換,影響性能。具體來說,最好是在一個Future對象里執行阻塞操作(例如IO),這樣就不會阻塞actor,如:
case evt => blockingCall() // BAD
case evt => Future {
blockingCall() // GOOD
}
- 要確認你的消息都是不可變的,因為互相傳遞消息的actor都在它們自己的線程里并發運行。可變的消息很有可能導致不可預期的行為。
- 由于在節點之間發送的消息必須是可序列化的,所以必須要記住消息體越大,序列化、發送和反序列化所花費的時間就越多,這也會降低性能。
結論
Akka用Scala語言寫成,簡化并為開發高并發、分布式和容錯式應用提供了便利,對開發者隱藏了很大程度的復雜性。把Akka用好肯定需要了解比這個教程更多的內容,但是希望這里的介紹和示例能夠引起你的注意并繼續了解Akka。
Amazon、VMWare和CSC只是現在積極使用Akka的一部分領軍企業。可以訪問Akka的官方網站學到更多的知識,并多花點時間研究Akka是否適合你的項目。
來自:http://hongbinzuo.github.io/2014/12/16/Akka-Tutorial-with-Code-Conncurrency-and-Fault-Tolerance/