【Akka】在并發程序中使用Future
引言
在Akka中, 一個 Future 是用來獲取某個并發操作的結果的數據結構。這個操作通常是由Actor執行或由Dispatcher直接執行的. 這個結果可以以同步(阻塞)或異步(非阻塞)的方式訪問。
Future提供了一種簡單的方式來執行并行算法。
Future直接使用
Future中的一個常見用例是在不需要使用Actor的情況下并發地執行計算。Future有兩種使用方式:
- 阻塞方式(Blocking):該方式下,父actor或主程序停止執行知道所有future完成各自任務。通過 scala.concurrent.Await 使用。
- 非阻塞方式(Non-Blocking),也稱為回調方式(Callback):父actor或主程序在執行期間啟動future,future任務和父actor并行執行,當每個future完成任務,將通知父actor。通過 onComplete 、 onSuccess 、 onFailure 方式使用。
執行上下文(ExecutionContext)
為了運行回調和操作,Futures需要有一個 ExecutionContext 。
如果你在作用域內有一個 ActorSystem ,它會它自己派發器用作ExecutionContext,你也可以用ExecutionContext伴生對象提供的工廠方法來將Executors和ExecutorServices進行包裹,或者甚至創建自己的實例。
通過導入 ExecutionContext.Implicits.global 來導入默認的全局執行上下文。你可以把該執行上下文看做是一個線程池,ExecutionContext是在某個線程池執行任務的抽象。
如果在代碼中沒有導入該執行上下文,代碼將無法編譯。
阻塞方式
第一個例子展示如何創建一個future,然后通過阻塞方式等待其計算結果。雖然阻塞方式不是一個很好的用法,但是可以說明問題。這個例子中,通過在未來某個時間計算1+1,當計算結果后再返回。
importscala.concurrent.{Await,Future} importscala.concurrent.duration._ importscala.concurrent.ExecutionContext.Implicits.globalobjectFutureBlockDemoextendsApp{ implicit val baseTime = System.currentTimeMillis
// create a Future val f = Future{ Thread.sleep(500) 1+1 } // this isblocking(blockingisbad) val result=Await.result(f,1second) // 如果Future沒有在Await規定的時間里返回, // 將拋出java.util.concurrent.TimeoutException println(result) Thread.sleep(1000) }</pre>
代碼解釋:
- 在上面的代碼中,被傳遞給Future的代碼塊會被缺省的 Dispatcher 所執行,代碼塊的返回結果會被用來完成 Future 。 與從Actor返回的Future不同,這個Future擁有正確的類型, 我們還避免了管理Actor的開銷。
- Await.result 方法將阻塞1秒時間來等待Future結果返回,如果Future在規定時間內沒有返回,將拋出 java.util.concurrent.TimeoutException 異常。
- 通過導入 scala.concurrent.duration._ ,可以用一種方便的方式來聲明時間間隔,如 100 nanos , 500 millis , 5 seconds 、 1 minute 、 1 hour , 3 days 。還可以通過 Duration(100, MILLISECONDS) , Duration(200, "millis") 來創建時間間隔。
非阻塞方式(回調方式)
有時你只需要監聽 Future 的完成事件,對其進行響應,不是創建新的Future,而僅僅是產生副作用。
通過 onComplete , onSuccess , onFailure 三個回調函數來異步執行Future任務,而后兩者僅僅是第一項的特例。
使用onComplete的代碼示例:
importscala.concurrent.{Future} importscala.concurrent.ExecutionContext.Implicits.global importscala.util.{Failure, Success} importscala.util.RandomobjectFutureNotBlockextendsApp{ println("starting calculation ...") valf = Future { Thread.sleep(Random.nextInt(500)) 42 }
println("before onComplete") f.onComplete{ caseSuccess(value) => println(s"Got the callback, meaning = $value") caseFailure(e) => e.printStackTrace }
// do the rest of your work println("A ...") Thread.sleep(100) println("B ....") Thread.sleep(100) println("C ....") Thread.sleep(100) println("D ....") Thread.sleep(100) println("E ....") Thread.sleep(100)
Thread.sleep(2000) }</pre>
使用onSuccess、onFailure的代碼示例:
importscala.concurrent.{Future} importscala.concurrent.ExecutionContext.Implicits.global importscala.util.{Failure, Success} importscala.util.RandomobjectTest12_FutureOnSuccessAndFailureextendsApp{ valf = Future { Thread.sleep(Random.nextInt(500)) if(Random.nextInt(500) >250)thrownewException("Tikes!")else42 }
f onSuccess { caseresult => println(s"Success: $result") }
f onFailure { caset => println(s"Exception: ${t.getMessage}") }
// do the rest of your work println("A ...") Thread.sleep(100) println("B ....") Thread.sleep(100) println("C ....") Thread.sleep(100) println("D ....") Thread.sleep(100) println("E ....") Thread.sleep(100)
Thread.sleep(1000) }</pre>
代碼解釋:
上面兩段例子中,Future結構中隨機延遲一段時間,然后返回結果或者拋出異常。
然后在回調函數中進行相關處理。
</div>
創建返回Future[T]的方法
先看一下示例:
importscala.concurrent.{Await,Future, future} importscala.concurrent.ExecutionContext.Implicits.global importscala.util.{Failure,Success}objectReturnFutureextendsApp{ implicit val baseTime = System.currentTimeMillis
//
future
methodisanother way to create a future // Itstarts the computation asynchronouslyandretures aFuture[Int] that // will hold the resultofthe computation. def longRunningComputation(i: Int):Future[Int] = future { Thread.sleep(100) i + 1 }// this does notblock longRunningComputation(11).onComplete { caseSuccess(result) => println(s"result = $result") caseFailure(e) => e.printStackTrace }
// keep the jvm fromshutting down Thread.sleep(1000) }</pre>
代碼解釋:
上面代碼中的longRunningComputation返回一個 Future[Int] ,然后進行相關的異步操作。
其中 future 方法是創建一個future的另一種方法。它將啟動一個異步計算并且返回包含計算結果的 Future[T] 。
</div>
Future用于Actor
通常有兩種方法來從一個Actor獲取回應: 第一種是發送一個消息 actor ! msg ,這種方法只在發送者是一個Actor時有效;第二種是通過一個Future。
使用Actor的 ? 方法來發送消息會返回一個Future。 要等待并獲取結果的最簡單方法是:
importscala.concurrent.Await importakka.pattern.ask importscala.concurrent.duration._ importakka.util.Timeoutimplicit val timeout = Timeout(5seconds) val future = actor ? msg val result=Await.result(future, timeout.duration).asInstanceOf[String]</pre>
下面是使用 ? 發送消息給actor,并等待回應的代碼示例:
importakka.actor. importakka.pattern.ask importakka.util.Timeout importscala.concurrent.{Await, Future} importscala.language.postfixOps importscala.concurrent.duration.caseobjectAskNameMessage
classTestActorextendsActor{ defreceive = { caseAskNameMessage =>// respond to the 'ask' request sender ! "Fred" case_ => println("that was unexpected") } } objectAskDemoextendsApp{ //create the system and actor valsystem = ActorSystem("AskDemoSystem") valmyActor = system.actorOf(Props[TestActor], name="myActor")
// (1) this is one way to "ask" another actor for information implicit valtimeout = Timeout(5seconds) valfuture = myActor ? AskNameMessage valresult = Await.result(future, timeout.duration).asInstanceOf[String] println(result)
// (2) a slightly different way to ask another actor for information valfuture2: Future[String] = ask(myActor, AskNameMessage).mapTo[String] valresult2 = Await.result(future2,1second) println(result2)
system.shutdown }</pre>
代碼解釋:
- Await.result(future, timeout.duration).asInstanceOf[String] 會導致當前線程被阻塞,并等待actor通過它的應答來完成 Future 。但是阻塞會導致性能問題,所以是不推薦的。致阻塞的操作位于 Await.result 和 Await.ready 中,這樣就方便定位阻塞的位置。
- 還要注意actor返回的Future的類型是 Future[Any] ,這是因為actor是動態的。 這也是為什么上例中注釋(1)使用了 asInstanceOf 。
- 在使用非阻塞方式時,最好使用 mapTo 方法來將Future轉換到期望的類型。如果轉換成功, mapTo 方法會返回一個包含結果的新的 Future,如果不成功,則返回 ClassCastException 異常。
轉載請注明作者Jason Ding及其出處
Github博客主頁(http://jasonding1354.github.io/)
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入我的博客主頁
</div> </div>
來自: http://jasonding1354.github.io/2016/01/19/Scala/【Akka】在并發程序中使用Future/
本文由用戶 tu7648 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!相關經驗
相關資訊