Voidbox: Docker on YARN在Hulu的實現
原文出處: IBM - Dennis Sosnoski
本 系列 中以前的文章介紹了如何通過以下方式實現并發性:
- 并行地在多個數據集上執行相同的操作(就像 Java 8 流一樣)
- 顯式地將計算構建成異步執行某些操作,然后將結果組合在一起(就像 future 一樣)。 </ul>
這兩種方法都是實現并發性的不錯方式,但是您必須將它們明確地設計到應用程序中。
在本文和接下來的幾篇文章中,我將著重介紹一種不同的并發性實現方法,該方法基于一種特定的程序結構,與顯式編碼方法不同。這種程序結構就是 actor 模型。您將了解如何使用 actor 模型的 Akka 實現。(Akka 是一個構建并發和分布式 JVM 應用程序的工具包和運行時。)請參閱 參考資料,獲取本文完整示例代碼的鏈接。
Actor 模型基礎知識
用于并發計算的 actor 模型基于各種稱為 actor 的原語來構建系統。Actor 執行操作來響應稱為消息 的輸入。這些操作包括更改 actor 自己的內部狀態,以及發出其他消息和創建其他 actor。所有消息都是異步交付的,因此將消息發送方與接收方分開。正是由于這種分離,導致 actor 系統具有內在的并發性:可以不受限制地并行執行任何擁有輸入消息的 actor。
在 Akka 術語中,actor 看起來就像是某種通過消息進行交互的行為神經束。像真實世界的演員一樣,Akka actor 也需要一定程度的隱私。您不能直接將消息發送給 Akka actor。相反,需要將消息發送給等同于郵政信箱的 actor 引用。然后通過該引用將傳入的消息路由到 actor 的郵箱,以后再傳送給 actor。Akka actor 甚至要求所有傳入的消息都是無菌的(或者在 JVM 術語中叫做不可變的),以免受到其他 actor 的污染。
與一些真實世界中演員的需求不同,Akka 中由于某種原因而存在一些看似強制要求的限制。使用 actor 的引用可阻止交換消息以外的任何交互,這些交互可能破壞 actor 模型核心上的解耦本質。Actor 在執行上是單線程的(不超過 1 個線程執行一個特定的 actor 實例),所以郵箱充當著一個緩沖器,在處理消息前會一直保存這些消息。消息的不可變性(由于 JVM 的限制,目前未由 Akka 強制執行,但這是一項既定的要求)意味著根本無需擔心可能影響 actor 之間各種共享的數據的同步問題;如果只有共享的數據是不可變的,那么根本不需要同步。
開始實現
現在您已大體了解了 actor 模型和 Akka 細節,是時候看些代碼了。使用 hello 作為編碼示例司空見慣,但它確實能夠幫助用戶快速、輕松地理解一種語言或系統。清單 1 顯示了 Scala 中的一個 Akka 版本。
清單 1. 簡單的 Scala hello
import akka.actor._ import akka.util._ /** Simple hello from an actor in Scala. */ object Hello1 extends App { val system = ActorSystem("actor-demo-scala") val hello = system.actorOf(Props[Hello]) hello ! "Bob" Thread sleep 1000 system shutdown class Hello extends Actor { def receive = { case name: String => println(s"Hello $name") } } }
清單 1 中的代碼分為兩個單獨的代碼段,它們都包含在Hello1應用程序項目中。第一個代碼段是 Akka 應用程序基礎架構,它:
- 創建一個 actor 系統(ActorSystem(...)行)。
- 在系統內創建一個 actor(system.actorOf(...)行,它為所創建的 actor 返回一個 actor 引用)。
- 使用 actor 引用向 actor 發送消息(hello !"Bob"行)。
- 等待一秒鐘,然后關閉 actor 系統(system shutdown行)。
system.actorOf(Props[Hello])調用是創建 actor 實例的推薦方式,它使用了專門用于Helloactor 類型的配置屬性。對于這個簡單的 actor(扮演一個小角色,只有一句臺詞),沒有配置信息,所以Props對象沒有參數。如果想在您的 actor 上設置某種配置,可專門為該 actor 定義一個其中包含了所有必要信息的Props類。(后面的示例會展示如何操作。)
hello !"Bob"語句將一條消息(在本例中為字符串Bob)發送給已創建的 actor。!運算符是 Akka 中表示將一條消息發送到 actor 的便捷方式,采用了觸發即忘的模式。如果不喜歡這種特殊的運算符風格,可使用tell()方法實現相同的功能。
第二段代碼是Helloactor 定義,以class Hello extends Actor開頭。這個特定的 actor 定義非常簡單。它定義必需的(對于所有 actor)局部函數receive,該函數實現了傳入消息的處理方式。(receive是一個局部函數,因為僅為一些輸入定義了它 — 在本例中,僅為String消息輸入定義了該函數。)為這個 actor 所實現的處理方法是,只要收到一條String消息,就使用該消息值打印一條問候語。
Java 中的 Hello
清單 2 給出了清單 1 中的 Akka Hello 在普通 Java 中的表示。
清單 2. Java 中的 Hello
import akka.actor.*; public class Hello1 { public static void main(String[] args) { ActorSystem system = ActorSystem.create("actor-demo-java"); ActorRef hello = system.actorOf(Props.create(Hello.class)); hello.tell("Bob", ActorRef.noSender()); try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignore */ } system.shutdown(); } private static class Hello extends UntypedActor { public void onReceive(Object message) throws Exception { if (message instanceof String) { System.out.println("Hello " + message); } } } }
清單 3 顯示了來自包含 lambda 的 Java 8 的 actor 定義,以及 lambda 支持的ReceiveBuilder類所需要的導入。清單 3 或許更加緊湊,但與清單 2 大同小異。
清單 3. Java 8 的 Akka Hello 版本
import akka.japi.pf.ReceiveBuilder; ... private static class Hello extends AbstractActor { public Hello() { receive(ReceiveBuilder. match(String.class, s -> { System.out.println("Hello " + s); }). build()); } }
與清單 2 相比,清單 3 中的 Java 8 代碼使用了一個不同的基類 —AbstractActor代替UntypedActor— 而且還使用了一種不同的方式來定義消息處理方案。ReceiveBuilder類允許您使用 lambda 表達式來定義消息的處理方式,并采用了類似 Scala 的匹配語法。如果您主要在 Scala 中進行開發工作,此技術可能有助于讓 Java Akka 代碼看起來更簡潔,但使用 Java 8 特定版本的好處似乎有些微不足道。
為什么還要等待?
在主應用程序代碼中,將消息發送到 actor 之后,會有一次Thread sleep 1000形式的等待,然后才會關閉系統。您可能想知道為什么需要等待。畢竟,消息很容易處理;難道消息沒有立即傳到 actor,在hello !"Bob"語句完成時還在處理當中?
這個問題的答案很簡單:“不是”。Akka actor 是異步運行的,所以即使目標 actor 與發送方 actor 位于相同的 JVM 中,目標 actor 也絕不會立即開始執行。相反,處理該消息的線程會將消息添加到目標 actor 的郵箱中。將消息添加到郵箱中會觸發一個線程,以便從郵箱獲取該消息并調用 actor 的receive方法來處理。但從郵箱獲取消息的線程通常不同于將消息添加到郵箱的線程。
消息傳送時間和保證
“為什么還要等待?” 這一問題的簡短答案的背后是一種更深入的原理。Akka 支持 actor 遠程通信且具有位置透明性,意味著您的代碼沒有任何直接的方式來了解一個特定的 actor 是位于同一 JVM 中,還是在系統外的云中某處運行。但這兩種情況在實際操作中顯然具有完全不同的特征。
“Akka 無法保證消息將被傳送到目的地。這種無保證傳送背后的哲學原理是 Akka 的核心原理之一。 ”
一個差別與消息丟失有關。Akka 無法保證消息將被傳送到目的地,熟悉消息傳遞系統(用于連接應用程序)的開發人員可能對此很吃驚。這種無保證傳送背后的哲學原理是 Akka 的核心原理之一:針對失敗而設計。作為一種有意為之的過度簡化,可以認為傳送保證為消息傳輸系統添加了很高的復雜性,而且這些更復雜的系統有時無法按預期運行,而應用程序代碼還必須涉及恢復操作。這種原理在應用程序代碼始終自行處理傳送失敗情況時很有意義,能夠讓消息傳送系統保持簡單。
Akka 可以 保證消息最多傳送一次,而且絕不會無序地收到從一個 actor 實例發送到另一個 actor 實例的消息。但后者僅適用于特定的 actor 對,二者沒有聯系。如果 actor A 將消息發送給 actor B,這些消息絕不會被打亂順序。如果 actor A 將消息發送給 actor C,情況也是如此。但是,如果 actor B 也將消息發送給 actor C(例如將來自 A 的消息轉發給 C),B 的消息相對于來自 A 的消息而言可能是亂序的。
在 清單 1 的代碼中,消息丟失的概率非常低,因為代碼在單個 JVM 中運行,不會生成過多的消息負載。(過多的消息負載可能導致消息丟失。如果 Akka 沒有空間來存儲消息,例如它沒有備用方案,那么只能丟棄消息。)但清單 1 代碼的結構仍未對消息傳送事件做出任何假設,而且允許 actor 系統執行異步操作。
Actor 和狀態
Akka 的 actor 模型很靈活,支持所有類型的 actor。可以使用沒有狀態信息的 actor(就像Hello1示例中一樣),但這些 actor 可能等效于方法調用。添加狀態信息可實現更為靈活的 actor 函數。
清單 1 提供了一個完整的(但很普通)actor 系統示例 — 但擁有一個始終執行同一工作的 actor。每位演員都討厭反復重復同一句話,所以清單 4 向 actor 添加了一些狀態信息,使工作變得更有趣。
清單 4. Polyglot Scala hello
object Hello2 extends App { case class Greeting(greet: String) case class Greet(name: String) val system = ActorSystem("actor-demo-scala") val hello = system.actorOf(Props[Hello], "hello") hello ! Greeting("Hello") hello ! Greet("Bob") hello ! Greet("Alice") hello ! Greeting("Hola") hello ! Greet("Alice") hello ! Greet("Bob") Thread sleep 1000 system shutdown class Hello extends Actor { var greeting = "" def receive = { case Greeting(greet) => greeting = greet case Greet(name) => println(s"$greeting $name") } } }
清單 4 中的 actor 知道如何處理兩種不同類型的消息,這些消息在清單的開頭附近定義:Greeting消息和Greet消息,每條消息都包裝了一個字符串值。修改后的Helloactor 收到Greeting消息時,會將所包裝的字符串保存為greeting值。收到Greet消息時,則將已保存的 greeting 值與Greet字符串組合起來,形成最終的消息。下面在運行此應用程序時,我們可以看到在控制臺中打印出的消息(但消息不一定是按此順序出現的,因為 actor 執行順序是不確定的):
Hello Bob Hello Alice Hola Alice Hola Bob
清單 4 中并沒有太多的新代碼,所以我沒有提供其 Java 版本。您可在代碼下載內容中找到它們(參見 參考資料),名為com.sosnoski.concur.article5java.Hello2和com.sosnoski.concur.article5java8.Hello2。
屬性和交互
真正的 actor 系統會使用多個 actor 來完成工作,它們彼此發送消息來進行交互。并且常常需要為這些 actor 提供配置信息,以準備履行其具體的職責。清單 5 基于 Hello 示例中使用的技術,展示了簡化版的 actor 配置和交互。
清單 5. Actor 屬性和交互
object Hello3 extends App { import Greeter._ val system = ActorSystem("actor-demo-scala") val bob = system.actorOf(props("Bob", "Howya doing")) val alice = system.actorOf(props("Alice", "Happy to meet you")) bob ! Greet(alice) alice ! Greet(bob) Thread sleep 1000 system shutdown object Greeter { case class Greet(peer: ActorRef) case object AskName case class TellName(name: String) def props(name: String, greeting: String) = Props(new Greeter(name, greeting)) } class Greeter(myName: String, greeting: String) extends Actor { import Greeter._ def receive = { case Greet(peer) => peer ! AskName case AskName => sender ! TellName(myName) case TellName(name) => println(s"$greeting, $name") } } }
清單 5 在領導角色中包含了一個新的 actor,即Greeteractor。Greeter在Hello2示例的基礎上更進了一步,包含:
- 所傳遞的屬性,目的是配置Greeter實例
- 定義了配置屬性和消息的 Scala 配套對象(如果您有 Java 工作背景,可將這個配套對象視為與 actor 類同名的靜態 helper 類)
- 在Greeteractor 的實例間發送的消息
此代碼生成的輸出很簡單:
Howya doing, Alice Happy to meet you, Bob
如果嘗試運行該代碼幾次,可能會看到這些行的順序是相反的。這種排序是 Akka actor 系統動態本質的另一個例子,其中處理各個消息時的順序是不確定的(但包含我在 “消息傳送時間和保證” 中討論的幾個重要例外)。
Java 中的Greeter
清單 6 顯示了清單 5 中的 AkkaGreeter代碼的普通 Java 版本。
清單 6. Java 中的Greeter
public class Hello3 { public static void main(String[] args) { ActorSystem system = ActorSystem.create("actor-demo-java"); ActorRef bob = system.actorOf(Greeter.props("Bob", "Howya doing")); ActorRef alice = system.actorOf(Greeter.props("Alice", "Happy to meet you")); bob.tell(new Greet(alice), ActorRef.noSender()); alice.tell(new Greet(bob), ActorRef.noSender()); try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignore */ } system.shutdown(); } // messages private static class Greet { public final ActorRef target; public Greet(ActorRef actor) { target = actor; } } private static Object AskName = new Object(); private static class TellName { public final String name; public TellName(String name) { this.name = name; } } // actor implementation private static class Greeter extends UntypedActor { private final String myName; private final String greeting; Greeter(String name, String greeting) { myName = name; this.greeting = greeting; } public static Props props(String name, String greeting) { return Props.create(Greeter.class, name, greeting); } public void onReceive(Object message) throws Exception { if (message instanceof Greet) { ((Greet)message).target.tell(AskName, self()); } else if (message == AskName) { sender().tell(new TellName(myName), self()); } else if (message instanceof TellName) { System.out.println(greeting + ", " + ((TellName)message).name); } } } }
清單 7 顯示了包含 lambda 的 Java 8 版本。同樣,此版本在消息處理的實現方面要更為緊湊,但其他方面都是相同的。
清單 7. Java 8 版本
import akka.japi.pf.ReceiveBuilder; ... private static class Greeter extends AbstractActor { private final String myName; private final String greeting; Greeter(String name, String greeting) { myName = name; this.greeting = greeting; receive(ReceiveBuilder. match(Greet.class, g -> { g.target.tell(AskName, self()); }). matchEquals(AskName, a -> { sender().tell(new TellName(myName), self()); }). match(TellName.class, t -> { System.out.println(greeting + ", " + t.name); }). build()); } public static Props props(String name, String greeting) { return Props.create(Greeter.class, name, greeting); } }
傳遞屬性
Akka 使用Props對象將各種配置屬性傳遞給 actor。每個Props實例包裝 actor 類所需的構造函數參數的一個副本,以及對該類的引用。可通過兩種方式將此信息傳遞給Props構造函數。清單 5 中的示例將 actor 的構造函數作為一個名稱傳遞 (pass-by-name) 參數傳遞給Props構造函數。注意,此方式不會直接調用構造函數并傳遞結果;它傳遞構造函數調用(如果您有 Java 工作背景,可能覺得這很陌生)。
將 actor 配置傳遞給Props構造函數的另一種方法是,提供 actor 的類作為第一個參數,將 actor 的構造函數參數作為剩余的參數。對于 清單 5 中的示例,這種調用形式為Props(classOf[Greeter], name, greeting)。
無論使用哪種形式的Props構造函數,傳遞給新 actor 的值都需要可序列化,以便在必要時通過網絡將Props發送到可運行該 actor 實例的任何地方。對于名稱傳遞構造函數調用的情況,就像 清單 5 中的用法,需要將調用發送出 JVM 時,會序列化調用的閉包。
在 Scala 代碼中創建Props對象的 Akka 建議做法是:在一個配套對象中定義工廠方法,就像 清單 5 中所做的那樣。對Props使用名稱傳遞構造函數調用方法時,此技術可阻止任何問題意外地關閉對 actor 對象的this引用。配套對象也是定義 actor 將接收的各種消息的不錯地方,這樣,所有關聯的信息都位于同一位置。對于 Java actor,也可在 actor 類中使用靜態構造函數方法,如 清單 6 中所用的方法。
發送消息的 Actor
清單 5 中的每個Greeteractor 都配置了一個名稱和一句問候語,但將問候語告知另一個 actor 時,首先要找到另一個 actor 的名稱。Greeteractor 通過向另一個 actor 發送一條單獨的消息來完成此任務:AskName消息。AskName消息本身不含任何信息,但收到它的Greeter實例知道應使用一個包含TellName發送方名稱的TellName消息作為響應。當第一個Greeter收到所返回的TellName消息時,它打印出自己的問候語。
發送給 actor 的每個消息都包含由 Akka 提供的一些附加信息,最特別的是消息發送方的ActorRef。您可在消息處理過程中的任何時刻,通過調用在 actor 基類上定義的sender()方法來訪問這些發送方的信息。Greeteractor 在處理AskName消息的過程中會使用發送方引用,以便將TellName響應發送給正確的 actor。
Akka 允許您代表另一個 actor 發送消息(一種良性的身份盜竊形式),以便收到該消息的 actor 將另一個 actor 視為發送方。這是在 actor 系統中經常使用的一個有用特性,尤其是對于請求-響應類型的消息交換,因為此時您希望將響應傳送到不同于發出請求的 actor 的某個地方。actor 外部的應用程序代碼所發出的消息,默認將使用名為 deadletter actor 的特殊 Akka 作為發送方。任何時候無法將消息傳送給 actor 時,也可使用 deadletter actor,這為用戶提供了一種便捷的方式,在 actor 系統中通過打開合適的日志(我將在下一期中介紹)來跟蹤無法傳送的消息。
設置 actor 的類型
您可能注意到了,示例的消息序列中沒有任何類型的信息來明確表明消息的目標是Greeter實例。Akka actor 及其交換的消息一般都屬于這種情況。甚至用于表示消息目標 actor 的ActorRef也是無類型的。
編寫無類型的 actor 系統有著實際的優勢。您可以 定義 actor 類型(比如通過它們可處理的一組消息),但這么做有誤導性。在 Akka 中,actor 可以改變它們的行為(下一期會更詳細地介紹此內容),所以不同的消息集可能適合不同的 actor 狀態。類型也可能妨礙我們合理地簡化 actor 模型,因為系統將所有 actor 視為至少擁有處理任何消息的潛力。
但是 Akka 仍然支持類型化 actor,以防您確實想要使用這種方法。這種支持在連接 actor 和非 actor 節點時最有用。您可定義一個接口,非 actor 節點使用該接口與 actor 進行通信,使 actor 看起來更像是正常的程序組件。對于大部分操作,這樣做所帶來的麻煩太多,可能不值得去做,但考慮到從 actor 系統外部直接將消息發送給 actor 的簡單性(從目前為止的任何示例應用程序中可以看到,非 actor 代碼可以發送消息),有這個選項也很不錯。
消息和可變性
Akka 希望您肯定不會意外地在 actor 之間共享可變的數據。如果共享可變的數據,結果會非常糟 — 比不上在對抗幽靈時穿過自己的質子束(如果您不太熟悉,參見電影做鬼敢死隊),但仍然很糟。共享可變數據的問題在于,actor 在單獨的線程中運行。如果在 actor 之間共享可變數據,則無法在運行 actor 的線程之間進行協調,所以各個線程不會看到其他線程正在做什么,并且可能通過多種不同的方式對彼此造成破壞。如果正在運行分布式系統,問題會更嚴重,每個 actor 都將擁有自己的可變數據副本。
所以消息必須是不可變的,而且不僅僅是在表面層面上。如果消息數據中包含任何對象,這些對象必須也是不可變的,依此類推,一直到消息所引用的所有對象。Akka 目前不能強制實施此要求,但 Akka 開發人員希望在將來的某個時刻能強制實施這些限制。如果您希望自己代碼在未來的 Akka 版本中仍可使用,那么現在必須留意這一要求。
詢問與告訴
清單 5 中的代碼使用標準tell操作來發送消息。在 Akka 中,也可使用ask消息模式作為一種輔助性操作。ask操作(由?運算符或使用ask函數表示)發送一條包含Future的消息作為響應。在清單 8 中,我們重建了清單 5 中的代碼,使用ask來代替tell。
清單 8. 使用ask
import scala.concurrent.duration._ import akka.actor._ import akka.util._ import akka.pattern.ask object Hello4 extends App { import Greeter._ val system = ActorSystem("actor-demo-scala") val bob = system.actorOf(props("Bob", "Howya doing")) val alice = system.actorOf(props("Alice", "Happy to meet you")) bob ! Greet(alice) alice ! Greet(bob) Thread sleep 1000 system shutdown object Greeter { case class Greet(peer: ActorRef) case object AskName def props(name: String, greeting: String) = Props(new Greeter(name, greeting)) } class Greeter(myName: String, greeting: String) extends Actor { import Greeter._ import system.dispatcher implicit val timeout = Timeout(5 seconds) def receive = { case Greet(peer) => { val futureName = peer ? AskName futureName.foreach { name => println(s"$greeting, $name") } } case AskName => sender ! myName } } }
在清單 8 的代碼中,TellName消息已被替換為ask。ask操作返回的 future 的類型為Future[Any],因為編譯器對要返回的結果一無所知。當 future 完成時,foreach使用import system.dispatcher語句所定義的隱式調度器來執行println。如果 future 未完成且在允許的超時(另一個隱式值,在本例中定義為 5 秒)內提供了響應消息,它會完成并拋出超時異常。
在幕后,ask模式創建一個特殊的一次性 actor 在消息交換中充當中介。該中介會收到一個Promise和要發送的消息,以及目標 actor 引用。它發送消息,然后等待期望的響應消息。收到響應后,它會履行承諾并完成最初的 actor 所使用的 future。
使用ask方法有一些限制。具體來講,要避免公開 actor 狀態(可能導致線程問題),必須確保您未在 future 完成時所執行的代碼中使用來自該 actor 的任何可變狀態。在實際情況中,為在 actor 之間發送的消息使用tell模式通常要更容易。ask 模式更有用的一種情況是,應用程序代碼需要從 actor(無論是否具有類型)獲取響應時(比如啟動 actor 系統和創建初始 actor 的主程序)。
小角色
“在對您明確處理異步操作有所幫助時,應毫不猶豫地向設計中引入新的 actor。”
ask 模式所創建的一次性 actor 是在使用 Akka 時要記住的一種出色設計原則。通常您希望構造您的 actor 系統,以便中間處理步驟是由為這種特定的用途而設計的特殊 actor 所執行的。一個常見的例子是,需要在進入下一處理階段之前合并不同的異步結果。如果為不同的結果使用消息,您可讓一個 actor 來收集各個結果,直到所有結果都準備好,然后觸發下一階段的操作。這基本上就是 ask 模式所用的一般的一次性 actor。
Akka actor 是輕型的(每個 actor 實例大約 300 到 400 字節,無論 actor 類使用哪種存儲都是如此),所以您可安全地設計程序結構,在適當的時候使用多個 actor。使用專用的 actor 有助于保持代碼簡單且易于理解,這也是編寫并發程序與編寫順序程序相比的一個優勢。在對您明確處理異步操作有所幫助時,應毫不猶豫地向設計中引入新的 actor。
補充幾句
Akka 是一個強大的系統,但 Akka 和 actor 模型通常都需要一種與直觀的過程代碼不同的編程風格。對于過程代碼,程序結構中的所有調用都是確定的,并且您可查看程序的整個調用樹。在 actor 模型中,消息是被樂觀地觸發的,無法保證它們將會送達,而且常常很難確定事件發生的順序。actor 模型的好處是,這是一種構建高并發性和可伸縮性應用程序的輕松方式,我在后面的幾期中會再次介紹此主題。
希望本文能夠讓您足夠清楚地了解 Akka,激起您進一步探索該內容的欲望。下一次我將更深入地介紹 actor 系統和 actor 交互,包括如何輕松地跟蹤系統中各 actor 之間的交互。