Java并發的四種風味:Thread、Executor、ForkJoin和Actor
來自: http://blog.csdn.net//chenleixing/article/details/44044243
這篇文章討論了Java應用中并行處理的多種方法。從自己管理Java線程,到各種更好幾的解決方法,Executor服務、ForkJoin 框架以及計算中的Actor模型。
Java并發編程的4種風格:Threads,Executors,ForkJoin和Actors
我們生活在一個事情并行發生的世界。自然地,我們編寫的程序也反映了這個特點,它們可以并發的執行。當然除了Python代碼(譯者注:鏈接里面講述了Python的全局解釋器鎖,解釋了原因),不過你仍然可以使用Jython在JVM上運行你的程序,來利用多處理器電腦的強大能力。
然而,并發程序的復雜程度遠遠超出了人類大腦的處理能力。相比較而言,我們簡直弱爆了:我們生來就不是為了思考多線程程序、評估并發訪問有限資源以及預測哪里會發生錯誤或者瓶頸。
面對這些困難,人類已經總結了不少并發計算的解決方案和模型。這些模型強調問題的不同部分,當我們實現并行計算時,可以根據問題做出不同的選擇。
在這篇文章中,我將會用對同一個問題,用不同的代碼來實現并發的解決方案;然后討論這些方案有哪些好的地方,有哪些缺陷,可能會有什么樣的陷阱在等著你。
我們將介紹下面幾種并發處理和異步代碼的方式:
? 裸線程
? Executors和Services
? ForkJoin框架和并行流
? Actor模型
為了更加有趣一些,我沒有僅僅通過一些代碼來說明這些方法,而是使用了一個共同的任務,因此每一節中的代碼差不多都是等價的。另外,這些代碼僅僅是展示用的,初始化的代碼并沒有寫出來,并且它們也不是產品級的軟件示例。
對了,最后一件事:在文章最后,有一個小調查,關于你或者你的組織正在使用哪種并發模式。為了你的工程師同胞們,請填一下調查!
任務
任務:實現一個方法,它接收一條消息和一組字符串作為參數,這些字符串與某個搜索引擎的查詢頁面對應。對每個字符串,這個方法發出一個http請求來查詢消息,并返回第一條可用的結果,越快越好。
如果有錯誤發生,拋出一個異常或者返回空都是可以的。我只是嘗試避免為了等待結果而出現無限循環。
簡單說明:這次我不會真正深入到多線程如何通訊的細節,或者深入到Java內存模型。如果你迫切地想了解這些,你可以看我前面的文章利用JCStress測試并發。
那么,讓我們從最直接、最核心的方式來在JVM上實現并發:手動管理裸線程。
方法1:使用“原汁原味”的裸線程
解放你的代碼,回歸自然,使用裸線程!線程是并發最基本的單元。Java線程本質上被映射到操作系統線程,并且每個線程對象對應著一個計算機底層線程。
自然地,JVM管理著線程的生存期,而且只要你不需要線程間通訊,你也不需要關注線程調度。
每個線程有自己的棧空間,它占用了JVM進程空間的指定一部分。
線程的接口相當簡明,你只需要提供一個Runnable,調用.start()開始計算。沒有現成的API來結束線程,你需要自己來實現,通過類似boolean類型的標記來通訊。
在下面的例子中,我們對每個被查詢的搜索引擎,創建了一個線程。查詢的結果被設置到AtomicReference,它不需要鎖或者其他機制來保證只出現一次寫操作。開始吧!
1
2
3
4
5
6
7
8
9
10
11
|
private
static
String getFirstResult(String question, List<String> engines) { AtomicReference<String> result = new
AtomicReference<>(); for (String base: engines) { String url = base + question; new
Thread(() -> { result.compareAndSet( null , WS.url(url).get()); }).start(); } while (result.get() == null ); // wait for some result to appear return
result.get(); } |
使用裸線程的主要優點是,你很接近并發計算的操作系統/硬件模型,并且這個模型非常簡單。多個線程運行,通過共享內存通訊,就是這樣。
自己管理線程的最大劣勢是,你很容易過分的關注線程的數量。線程是很昂貴的對象,創建它們需要耗費大量的內存和時間。這是一個矛盾,線程太少,你不能獲得良好的并發性;線程太多,將很可能導致內存問題,調度也變得更復雜。
然而,如果你需要一個快速和簡單的解決方案,你絕對可以使用這個方法,不要猶豫。
方法2:認真對待Executor和CompletionService
另一個選擇是使用API來管理一組線程。幸運的是,JVM為我們提供了這樣的功能,就是Executor接口。Executor接口的定義非常簡單:
1
2
3
4
5
|
public
interface
Executor { void
execute(Runnable command); } |
它隱藏了如何處理Runnable的細節。它僅僅說,“開發者!你只是一袋肉,給我任務,我會處理它!”
更酷的是,Executors類提供了一組方法,能夠創建擁有完善配置的線程池和executor。我們將使用newFixedThreadPool(),它創建預定義數量的線程,并不允許線程數量超過這個預定義值。這意味著,如果所有的線程都被使用的話,提交的命令將會被放到一個隊列中等待;當然這是由executor來管理的。
在它的上層,有ExecutorService管理executor的生命周期,以及CompletionService會抽象掉更多細節,作為已完成任務的隊列。得益于此,我們不必擔心只會得到第一個結果。
下面service.take()的一次調用將會只返回一個結果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
private
static
String getFirstResultExecutors(String question, List<String> engines) { ExecutorCompletionService<String> service = new
ExecutorCompletionService<String>(Executors.newFixedThreadPool( 4 )); for (String base: engines) { String url = base + question; service.submit(() -> { return
WS.url(url).get(); }); } try
{ return
service.take().get(); } catch (InterruptedException | ExecutionException e) { return
null ; } } |
如果你需要精確的控制程序產生的線程數量,以及它們的精確行為,那么executor和executor服務將是正確的選擇。例如,需要仔細考慮的一個重要問題是,當所有線程都在忙于做其他事情時,需要什么樣的策略?增加線程數量或者不做數量限制?把任務放入到隊列等待?如果隊列也滿了呢?無限制的增加隊列大小?
感謝JDK,已經有很多配置項回答了這些問題,并且有著直觀的名字,例如上面的Executors.newFixedThreadPool(4)。
線程和服務的生命周期也可以通過選項來配置,使資源可以在恰當的時間關閉。唯一的不便之處是,對新手來說,配置選項可以更簡單和直觀一些。然而,在并發編程方面,你幾乎找不到更簡單的了。
總之,對于大型系統,我個人認為使用executor最合適。
方法3:通過并行流,使用ForkJoinPool (FJP)
Java 8中加入了并行流,從此我們有了一個并行處理集合的簡單方法。它和lambda一起,構成了并發計算的一個強大工具。
如果你打算運用這種方法,那么有幾點需要注意。首先,你必須掌握一些函數編程的概念,它實際上更有優勢。其次,你很難知道并行流實際上是否使用了超過一個線程,這要由流的具體實現來決定。如果你無法控制流的數據源,你就無法確定它做了什么。
另外,你需要記住,默認情況下是通過ForkJoinPool.commonPool()實現并行的。這個通用池由JVM來管理,并且被JVM進程內的所有線程共享。這簡化了配置項,因此你不用擔心。
1
2
3
4
5
6
7
8
|
private
static
String getFirstResult(String question, List<String> engines) { // get element as soon as it is available Optional<String> result = engines.stream().parallel().map((base) -> { String url = base + question; return
WS.url(url).get(); }).findAny(); return
result.get(); } |
看上面的例子,我們不關心單獨的任務在哪里完成,由誰完成。然而,這也意味著,你的應用程序中可能存在一些停滯的任務,而你卻無法不知道。在另一篇關于并行流的文章中,我詳細地描述了這個問題。并且有一個變通的解決方案,雖然它并不是世界上最直觀的方案。
ForkJoin是一個很好的框架,由比我更聰明的人來編寫和預先配置。因此當我需要寫一個包含并行處理的小型程序時,它是我的第一選擇。
它最大的缺點是,你必須預見到它可能產生的并發癥。如果對JVM沒有整體上的深入了解,這很難做到。這只能來自于經驗。
方法4:雇用一個Actor
Actor模型是對我們本文中所探討的方法的一個奇怪的補充。JDK中沒有actor的實現;因此你必須引用一些實現了actor的庫。
簡短地說,在actor模型中,你把一切都看做是一個actor。一個actor是一個計算實體,就像上面第一個例子中的線程,它可以從其他actor那里接收消息,因為一切都是actor。
在應答消息時,它可以給其他actor發送消息,或者創建新的actor并與之交互,或者只改變自己的內部狀態。
相當簡單,但這是一個非常強大的概念。生命周期和消息傳遞由你的框架來管理,你只需要指定計算單元是什么就可以了。另外,actor模型強調避免全局狀態,這會帶來很多便利。你可以應用監督策略,例如免費重試,更簡單的分布式系統設計,錯誤容忍度等等。
下面是一個使用Akka Actors的例子。Akka Actors有Java接口,是最流行的JVM Actor庫之一。實際上,它也有Scala接口,并且是Scala目前默認的actor庫。Scala曾經在內部實現了actor。不少JVM語言都實現了actor,比如Fantom。這些說明了Actor模型已經被廣泛接受,并被看做是對語言非常有價值的補充。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
static
class
Message { String url; Message(String url) { this .url = url;} } static
class
Result { String html; Result(String html) { this .html = html;} } static
class
UrlFetcher extends
UntypedActor { @Override public
void
onReceive(Object message) throws
Exception { if
(message instanceof
Message) { Message work = (Message) message; String result = WS.url(work.url).get(); getSender().tell( new
Result(result), getSelf()); } else
{ unhandled(message); } } } static
class
Querier extends
UntypedActor { private
String question; private
List<String> engines; private
AtomicReference<String> result; public
Querier(String question, List<String> engines, AtomicReference<String> result) { this .question = question; this .engines = engines; this .result = result; } @Override
public
void
onReceive(Object message) throws
Exception { if (message instanceof
Result) { result.compareAndSet( null , ((Result) message).html); getContext().stop(self()); } else
{ for (String base: engines) { String url = base + question; ActorRef fetcher = this .getContext().actorOf(Props.create(UrlFetcher. class ), "fetcher-" +base.hashCode()); Message m = new
Message(url); fetcher.tell(m, self()); } } } } private
static
String getFirstResultActors(String question, List<String> engines) { ActorSystem system = ActorSystem.create( "Search" ); AtomicReference<String> result = new
AtomicReference<>(); final
ActorRef q = system.actorOf( Props.create((UntypedActorFactory) () -> new
Querier(question, engines, result)), "master" ); q.tell( new
Object(), ActorRef.noSender()); while (result.get() == null ); return
result.get(); } |
Akka actor在內部使用ForkJoin框架來處理工作。這里的代碼很冗長。不要擔心。大部分代碼是消息類Message和Result的定義,然后是兩個不同的actor:Querier用來組織所有的搜索引擎,而URLFetcher用來從給定的URL獲取結果。這里代碼行比較多是因為我不愿意把很多東西寫在同一行上。Actor模型的強大之處來自于Props對象的接口,通過接口我們可以為actor定義特定的選擇模式,定制的郵箱地址等。結果系統也是可配置的,只包含了很少的活動件。這是一個很好的跡象!
使用Actor模型的一個劣勢是,它要求你避免全局狀態,因此你必須小心的設計你的應用程序,而這可能會使項目遷移變得很復雜。同時,它也有不少優點,因此學習一些新的范例和使用新的庫是完全值得的。
反饋時間:你使用什么?
你最常用的并發方式是什么?你理解它背后的計算模式是什么嗎?僅僅使用一個包含Job或者后臺任務對象的框架來自動地為你的代碼添加異步計算能力?
為了收集更多信息,以找出我是否應該繼續更深入地講解一些不同的并發模式,例如,寫一篇關于Akka如何工作,以及它Java接口的優點和缺點,我創建了一個簡單的調查。親愛的讀者,請填一下調查表。我非常感謝你的互動!
總結
這篇文章中我們討論了在Java應用中添加并行的幾種不同方法。從我們自己管理Java線程開始,我們逐漸地發現更高級的解決方案,執行不同的executor服務、ForkJoin框架和actor計算模型。
不知道當你面臨真實問題時該如何選擇?它們都有各自的優缺點,你需要在直觀和易用性、配置和增加/減少機器性能等方面做出選擇。
翻譯: ImportNew.com - shenggordon
譯文鏈接: http://www.importnew.com/14506.html