次時代Java編程(一):Java里的協程
作者:劉小溪,Maxleap的高級開發工程師,喜歡倒騰一些有意思的技術框架,對新的技術以及語言非常有興趣,以前在shopex擔任架構師,目前在Maxleap負責基礎架構以及服務框架這塊技術,同時也會對Vert.x的社區提供一些開源上的支持。
責編:錢曙光,關注架構和算法領域,尋求報道或者投稿請發郵件qianshg@csdn.net,另有「CSDN 高級架構師群」,內有諸多知名互聯網公司的大牛架構師,歡迎架構師加微信qshuguang2008申請入群,備注姓名+公司+職位。
什么是協程(coroutine)
這東西其實有很多名詞,比如有的人喜歡稱為纖程(Fiber),或者綠色線程(GreenThread)。其實最直觀的解釋可以定義為線程的線程。有點拗口,但本質上就是這樣。
我們先回憶一下線程的定義,操作系統產生一個進程,進程再產生若干個線程 并行 的處理邏輯,線程的切換由操作系統負責調度。傳統語言C++ Java等線程其實與操作系統線程是1:1的關系,每個線程都有自己的Stack,Java在64位系統默認Stack大小是1024KB,所以指望一個進程開啟上萬個線程是不現實的。但是實際上我們也不會這么干,因為起這么多線程并不能充分的利用CPU,大部分線程處于等待狀態,CPU也沒有這么核讓線程使用。所以一般線程數目都是CPU的核數。
傳統的J2EE系統都是基于每個請求占用一個線程去完成完整的業務邏輯(包括事務)。所以系統的吞吐能力取決于每個線程的操作耗時。如果遇到很耗時的I/O行為,則整個系統的吞吐立刻下降,比如JDBC是同步阻塞的,這也是為什么很多人都說數據庫是瓶頸的原因。這里的耗時其實是讓CPU一直在等待I/O返回,說白了線程根本沒有利用CPU去做運算,而是處于空轉狀態。暴殄天物啊。另外過多的線程,也會帶來更多的ContextSwitch開銷。
Java的JDK里有封裝很好的ThreadPool,可以用來管理大量的線程生命周期,但是本質上還是不能很好的解決線程數量的問題,以及線程空轉占用CPU資源的問題。
先階段行業里的比較流行的解決方案之一就是單線程加上異步回調。其代表派是 node.js 以及Java里的新秀 Vert.x 。他們的核心思想是一樣的,遇到需要進行I/O操作的地方,就直接讓出CPU資源,然后注冊一個回調函數,其他邏輯則繼續往下走,I/O結束后帶著結果向事件隊列里插入執行結果,然后由事件調度器調度回調函數,傳入結果。這時候執行的地方可能就不是你原來的代碼區塊了,具體表現在代碼層面上,你會發現你的局部變量全部丟失,畢竟相關的棧已經被覆蓋了,所以為了保存之前的棧上數據,你要么選擇帶著一起放入回調函數里,要么就不停的嵌套,從而引起反人類的Callback hell。
因此相關的Promise,CompletableFuture等技術都是為解決相關的問題而產生的。但是本質上還是不能解決業務邏輯的割裂。
說了這么多,終于可以提一下協程了,協程的本質上其實還是和上面的方法一樣,只不過他的核心點在于調度那塊由他來負責解決,遇到阻塞操作,立刻yield掉,并且記錄當前棧上的數據,阻塞完后立刻再找一個線程恢復棧并把阻塞的結果放到這個線程上去跑,這樣看上去好像跟寫同步代碼沒有任何差別,這整個流程可以稱為 coroutine ,而跑在由coroutine負責調度的線程稱為 Fiber 。比如Golang里的 go 關鍵字其實就是負責開啟一個 Fiber ,讓 func 邏輯跑在上面。而這一切都是發生的用戶態上,沒有發生在內核態上,也就是說沒有ContextSwitch上的開銷。
既然我們的標題叫Java里的協程,自然我們會討論JVM上的實現,JVM上早期有 kilim 以及現在比較成熟的 Quasar 。而本文章會全部基于 Quasar ,因為 kilim 已經很久不更新了。
簡單的例子,用Java寫出Golang的味道
上面已經說明了什么是 Fiber ,什么是 coroutine 。這里嘗試通過 Quasar 來實現類似于golang的 coroutine 以及 channel 。這里假設各位已經大致了解golang。
為了對比,這里先用golang實現一個對于10以內自然數分別求平方的例子,當然了可以直接單線程for循環就完事了,但是為了凸顯coroutine的高逼格,我們還是要稍微復雜化一點的。
func counter(out chan<- int) {
for x := 0; x < 10; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
//定義兩個int類型的channel
naturals := make(chan int)
squares := make(chan int)
//產生兩個Fiber,用go關鍵字
go counter(naturals)
go squarer(squares, naturals)
//獲取計算結果
printer(squares)
}
上面的例子,有點類似生產消費者模式,通過channel兩解耦兩邊的數據共享。大家可以將channel理解為Java里的 SynchronousQueue 。那傳統的基于線程模型的Java實現方式,想必大家都知道怎么做,這里就不啰嗦了,我直接上 Quasar 版的,幾乎可以原封不動的copy golang的代碼。
public class Example {
private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException {
Integer v;
while ((v = in.receive()) != null) {
System.out.println(v);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
//定義兩個Channel
Channel<Integer> naturals = Channels.newChannel(-1);
Channel<Integer> squares = Channels.newChannel(-1);
//運行兩個Fiber實現.
new Fiber(() -> { for (int i = 0; i < 10; i++) naturals.send(i); naturals.close(); }).start(); new Fiber(() -> { Integer v; while ((v = naturals.receive()) != null) squares.send(v * v); squares.close(); }).start(); printer(squares); } }
看起來Java似乎要啰嗦一點,沒辦法這是Java的風格,而且畢竟不是語言上支持coroutine,是通過第三方的庫。到后面我會考慮用其他JVM上的語言去實現,這樣會顯得更精簡一點。
說到這里各位肯定對Fiber很好奇了。也許你會表示懷疑Fiber是不是如上面所描述的那樣,下面我們嘗試用Quasar建立一百萬個Fiber,看看內存占用多少,我先嘗試了創建百萬個 Thread 。
for (int i = 0; i < 1_000_000; i++) {
new Thread(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
很不幸,直接報 Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread ,這是情理之中的。下面是通過 Quasar 建立百萬個 Fiber 。
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
int FiberNumber = 1_000_000;
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < FiberNumber; i++) {
new Fiber(() -> { counter.incrementAndGet(); if (counter.get() == FiberNumber) { System.out.println("done"); } Strand.sleep(1000000); }).start(); } latch.await(); }
我這里加了latch,阻止程序跑完就關閉, Strand.sleep 其實跟 Thread.sleep 一樣,只是這里針對的是 Fiber 。
最終控制臺是可以輸出 done 的,說明程序已經創建了百萬個Fiber,設置Sleep是為了讓 Fiber 一直運行,從而方便計算內存占用。官方宣稱一個空閑的 Fiber 大約占用 400Byte ,那這里應該是占用 400MB 堆內存,但是這里通過 jmap -heap pid 顯示大約占用了 1000MB ,也就是說一個 Fiber 占用1KB。
Quasar是怎么實現Fiber的
其實Quasar實現的coroutine的方式與Golang很像,只不過一個是框架級別實現,一個是語言內置機制而已。
如果你熟悉了Golang的調度機制,那理解Quasar的調度機制就會簡單很多,因為兩者是差不多的。
Quasar里的Fiber其實是一個continuation,他可以被Quasar定義的scheduler調度,一個continuation記錄著運行實例的狀態,而且會被隨時中斷,并且也會隨后在他被中斷的地方恢復。Quasar其實是通過修改bytecode來達到這個目的,所以運行Quasar程序的時候,你需要先通過java-agent在運行時修改你的代碼,當然也可以在編譯期間這么干。golang的內置了自己的調度器,Quasar則默認使用 ForkJoinPool 這個JDK7以后才有的,具有 work-stealing 功能的線程池來當調度器。work-stealing非常重要,因為你不清楚哪個Fiber會先執行完,而work-stealing可以動態的從其他的等等隊列偷一個context過來,這樣可以最大化使用CPU資源。
那這里你會問了,Quasar怎么知道修改哪些字節碼呢,其實也很簡單,Quasar會通過java-agent在運行時掃描哪些方法是可以中斷的,同時會在方法被調用前和調度后的方法內插入一些 continuation 邏輯,如果你在方法上定義了 @Suspendable 注解,那Quasar會對調用該注解的方法做類似下面的事情。
這里假設你在方法 f 上定義了 @Suspendable ,同時去調用了有同樣注解的方法 g ,那么所有調用 f 的方法會插入一些字節碼,這些字節碼的邏輯就是記錄當前Fiber棧上的狀態,以便在未來可以動態的恢復。(Fiber類似線程也有自己的棧)。在 suspendable方法鏈內 Fiber的父類會調用 Fiber.park ,這樣會拋出 SuspendExecution 異常,從而來停止 線程 的運行,好讓Quasar的調度器執行調度。這里的 SuspendExecution 會被Fiber自己捕獲,業務層面上不應該捕獲到。如果Fiber被喚醒了(調度器層面會去調用 Fiber.unpark ),那么 f 會在被中斷的地方重新被調用(這里Fiber會知道自己在哪里被中斷),同時會把 g 的調用結果( g 會return結果)插入到 f 的恢復點,這樣看上去就好像 g 的return是 f 的 local variables 了,從而避免了callback嵌套。
上面啰嗦了一大堆,其實簡單點講就是,想辦法讓運行中的線程棧停下來,好讓Quasar的調度器介入。JVM線程中斷的條件只有兩個,一個是拋異常,另外一個就是return。這里Quasar就是通過拋異常的方式來達到的,所以你會看到我上面的代碼會拋出 SuspendExecution 。但是如果你真捕獲到這個異常,那就說明有問題了,所以一般會這么寫。
@Suspendable
public int f() {
try {
// do some stuff
return g() * 2;
} catch(SuspendExecution s) {
//這里不應該捕獲到異常.
throw new AssertionError(s);
}
}
與Golang性能對比
在github上無意中發現一個有趣的benchmark,大致是測試各種語言在生成百萬actor/Fiber的開銷 skynet 。
大致的邏輯是先生成10個Fiber,每個Fiber再生成10個Fiber,直到生成1百萬個Fiber,然后每個Fiber做加法累積計算,并把結果發到channel里,這樣一直遞歸到根Fiber。后將最終結果發到channel。如果邏輯沒有錯的話結果應該是499999500000。我們搞個Quasar版的,來測試一下性能。
所有的測試都是基于我的Macbook Pro Retina 2013later。Quasar-0.7.5:JDK8,JDK 1.8.0_91,Golang 1.6
public class Skynet {
private static final int RUNS = 4;
private static final int BUFFER = 1000; // = 0 unbufferd, > 0 buffered ; < 0 unlimited
static void skynet(Channel<Long> c, long num, int size, int div) throws SuspendExecution, InterruptedException {
if (size == 1) {
c.send(num);
return;
}
Channel<Long> rc = newChannel(BUFFER);
long sum = 0L;
for (int i = 0; i < div; i++) {
long subNum = num + i * (size / div);
new Fiber(() -> skynet(rc, subNum, size / div, div)).start();
}
for (int i = 0; i < div; i++)
sum += rc.receive();
c.send(sum);
}
public static void main(String[] args) throws Exception {
//這里跑4次,是為了讓JVM預熱好做優化,所以我們以最后一個結果為準。
for (int i = 0; i < RUNS; i++) {
long start = System.nanoTime();
Channel<Long> c = newChannel(BUFFER);
new Fiber(() -> skynet(c, 0, 1_000_000, 10)).start();
long result = c.receive();
long elapsed = (System.nanoTime() - start) / 1_000_000;
System.out.println((i + 1) + ": " + result + " (" + elapsed + " ms)");
}
}
}
golang的代碼我就不貼了,大家可以從github上拿到,我這里直接貼出結果。
platform | time |
---|---|
Golang | 261ms |
Quasar | 612ms |
從Skynet測試中可以看出,Quasar的性能對比Golang還是有差距的,但是不應該達到兩倍多吧,經過向Quasar作者求證才得知這個測試并沒有測試出實際性能,只是測試調度開銷而已。
因為skynet方法內部幾乎沒有做任何事情,只是簡單的做了一個加法然后進一步的遞歸生成新的Fiber而已,相當于只是測試了Quasar生成并調度百萬Fiber所需要的時間而已。而Java里的加法操作開銷遠比生成Fiber的開銷要低,因此感覺整體性能不如golang(golang的coroutine是語言級別的)。
實際上我們在實際項目中生成的Fiber中不可能只做一下簡單的加法就退出,至少要花費1ms做一些簡單的事情吧,(Quasar里Fiber的調度差不多在us級別),所以我們考慮在skynet里加一些比較耗時的操作,比如隨機生成1000個整數并對其進行排序,這樣Fiber里算是有了相應的性能開銷,與調度的開銷相比,調度的開銷就可以忽略不計了。(大家可以把調度開銷想象成不定積分的常數)。
下面我分別為兩種語言了加了數組排序邏輯,并插在響應的Fiber里。
public class Skynet {
private static Random random = new Random();
private static final int NUMBER_COUNT = 1000;
private static final int RUNS = 4;
private static final int BUFFER = 1000; // = 0 unbufferd, > 0 buffered ; < 0 unlimited
private static void numberSort() {
int[] nums = new int[NUMBER_COUNT];
for (int i = 0; i < NUMBER_COUNT; i++)
nums[i] = random.nextInt(NUMBER_COUNT);
Arrays.sort(nums);
}
static void skynet(Channel<Long> c, long num, int size, int div) throws SuspendExecution, InterruptedException {
if (size == 1) {
c.send(num);
return;
}
//加入排序邏輯
numberSort();
Channel<Long> rc = newChannel(BUFFER);
long sum = 0L;
for (int i = 0; i < div; i++) {
long subNum = num + i * (size / div);
new Fiber(() -> skynet(rc, subNum, size / div, div)).start();
}
for (int i = 0; i < div; i++)
sum += rc.receive();
c.send(sum);
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < RUNS; i++) {
long start = System.nanoTime();
Channel<Long> c = newChannel(BUFFER);
new Fiber(() -> skynet(c, 0, 1_000_000, 10)).start();
long result = c.receive();
long elapsed = (System.nanoTime() - start) / 1_000_000;
System.out.println((i + 1) + ": " + result + " (" + elapsed + " ms)");
}
}
}
const (
numberCount = 1000
loopCount = 1000000
)
//排序函數
func numberSort() {
nums := make([]int, numberCount)
for i := 0; i < numberCount; i++ {
nums[i] = rand.Intn(numberCount)
}
sort.Ints(nums)
}
func skynet(c chan int, num int, size int, div int) {
if size == 1 {
c <- num
return
}
//加了排序邏輯
numberSort()
rc := make(chan int)
var sum int
for i := 0; i < div; i++ {
subNum := num + i*(size/div)
go skynet(rc, subNum, size/div, div)
}
for i := 0; i < div; i++ {
sum += <-rc
}
c <- sum
}
func main() {
c := make(chan int)
start := time.Now()
go skynet(c, 0, loopCount, 10)
result := <-c
took := time.Since(start)
fmt.Printf("Result: %d in %d ms.\n", result, took.Nanoseconds()/1e6)
}
platform | time |
---|---|
Golang | 23615ms |
Quasar | 15448ms |
最后再進行一次測試,發現Java的性能優勢體現出來了。幾乎是golang的1.5倍,這也許是JVM/JDK經過多年優化的優勢。因為加了業務邏輯后,對比的就是各種庫以及編譯器對語言的優化了,協程調度開銷幾乎可以忽略不計。
為什么協程在Java里一直那么小眾
其實早在JDK1的時代,Java的線程被稱為 GreenThread ,那個時候就已經有了Fiber,但是當時不能與操作系統實現N:M綁定,所以放棄了。現在Quasar憑借 ForkJoinPool 這個成熟的線程調度庫。另外,如果你希望你的代碼能夠跑在Fiber里面,需要一個很大的前提條件,那就是你所有的庫,必須是異步無阻塞的,也就說必須類似于node.js上的庫,所有的邏輯都是異步回調,而自Java里基本上所有的庫都是同步阻塞的,很少見到異步無阻塞的。而且得益于J2EE,以及Java上的三大框架(SSH)洗腦,大部分Java程序員都已經習慣了基于線程,線性的完成一個業務邏輯,很難讓他們接受一種將邏輯割裂的異步編程模型。
但是隨著異步無阻塞這股風氣起來,以及相關的 coroutine 語言Golang大力推廣,人們越來越知道如何更好的榨干CPU性能(讓CPU避免不必要的等待,減少上下文切換),阻塞的行為基本發生在I/O上,如果能有一個庫能把所有的I/O行為都包裝成異步阻塞的話,那么Quasar就會有用武之地,JVM上公認的是異步網絡通信庫是Netty,通過Netty基本解決了網絡I/O問題,另外還有一個是文件I/O,而這個JDK7提供的NIO2就可以滿足,通過 AsynchronousFileChannel 即可。剩下的就是如何將他們封裝成更友好的API了。目前能達到生產級別的這種異步工具庫,JVM上只有 Vert.x3 ,封裝了Netty4,封裝了 AsynchronousFileChannel ,而且Vert.x官方也出了一個相對應的封裝了 Quasar 的庫 vertx-sync 。
Quasar目前是由一家商業公司Parallel Universe控制著,且有自己的一套體系,包括Quasar-actor,Quasar-galaxy等各個模塊,但是Quasar-core是開源的,此外Quasar自己也通過Fiber封裝了很多的第三方庫,目前全都在comsat這個項目里。隨便找一個項目看看,你會發現其實通過Quasar的Fiber去封裝第三方的同步庫還是很簡單的。
寫在最后
異步無阻塞的編碼方式其實有很多種實現,比如node.js的提倡的Promise,對應到Java8的就是CompletableFuture。
另外事件響應式也算是一個比較流行的做法,比如ReactiveX系列,RxJava、Rxjs、RxSwift等。我個人覺得RxJava是一個非常好的函數式響應實現(JDK9會有對應的JDK實現),但是我們不能要求所有的程序員一眼就提煉出業務里的functor,monad(這些能力需要長期浸淫在函數式編程思想里),反而RxJava特別適合用在前端與用戶交互的部分,因為用戶的點擊滑動行為是一個個真實的事件流,這也是為什么RxJava在Android端非常火的原因,而后端基本上都是通過Rest請求過來,每一個請求其實已經限定了業務范圍,不會再有復雜的事件邏輯,所以基本上RxJava在Vert.x這端只是做了一堆的flatmap,再加上微服務化,所有的業務邏輯都已經做了最小的邊界,所以順序的同步的編碼方式更適合寫業務邏輯的后端程序員。
所以這里Golang開了個好頭,但是Golang也有其自身的限制,比如不支持泛型,當然這個仁者見仁智者見智了,包的依賴管理比較弱,此外Golang沒有線程池的概念,如果coroutine里的邏輯發生了阻塞,那么整個程序會hang死。而這點Vert.x提供了一個Worker Pool的概念,可以將需要耗時執行的邏輯包到線程池里面,執行完后異步返回給EventLoop線程。
下一篇我們來研究一下 vertx-sync ,讓vert.x里所有的異步編碼方式同步化,徹底解決 Vert.x 里的Callback Hell。