理解RxJava的線程模型
ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx,最初是LINQ的一個擴展,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步數據流,Rx庫支持.NET、JavaScript和C++,Rx近幾年越來越流行了,現在已經支持幾乎全部的流行編程語言了,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社區網站是 reactivex.io 。
Netflix參考微軟的Reactive Extensions創建了Java的實現RxJava,主要是為了簡化服務器端的并發。2013年二月份,Ben Christensen 和 Jafar Husain發在Netflix技術博客的一篇文章第一次向世界展示了RxJava。
RxJava也在Android開發中得到廣泛的應用。
ReactiveX
An API for asynchronous programming with observable streams.
A combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.
雖然RxJava是為異步編程實現的庫,但是如果不清楚它的使用,或者錯誤地使用了它的線程調度,反而不能很好的利用它的異步編程提到系統的處理速度。本文通過實例演示錯誤的RxJava的使用,解釋RxJava的線程調度模型,主要介紹 Scheduler 、 observeOn 和 subscribeOn 的使用。
本文中的例子以并發發送http request請求為基礎,通過性能檢驗RxJava的線程調度。
第一個例子,性能超好?
我們首先看第一個例子:
publicstaticvoidtestRxJavaWithoutBlocking(intcount)throwsException {
CountDownLatch finishedLatch = newCountDownLatch(1);
longt = System.nanoTime();
Observable.range(0, count).map(i -> {
//System.out.println("A:" + Thread.currentThread().getName());
return200;
}).subscribe(statusCode -> {
//System.out.println("B:" + Thread.currentThread().getName());
}, error -> {
}, () -> {
finishedLatch.countDown();
});
finishedLatch.await();
t = (System.nanoTime() - t) / 1000000;//ms
System.out.println("RxJavaWithoutBlocking TPS: "+ count *1000/ t);
}
這個例子是一個基本的RxJava的使用,利用Range創建一個Observable, subscriber處理接收的數據。因為整個邏輯沒有阻塞,程序運行起來很快,
輸出結果為:
RxJavaWithoutBlocking TPS: 7692307 。
加上業務的模擬,性能超差
上面的例子是一個理想化的程序,沒雨任何阻塞。我們模擬一下實際的應用,加上業務處理。
業務邏輯是發送一個http的請求,httpserver是一個模擬器,針對每個請求有30毫秒的延遲。subscriber統計請求結果:
publicstaticvoidtestRxJavaWithBlocking(intcount)throwsException {
URL url = newURL("http://127.0.0.1:8999/");
CountDownLatch finishedLatch = newCountDownLatch(1);
longt = System.nanoTime();
Observable.range(0, count).map(i -> {
try{
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
intresponseCode = conn.getResponseCode();
BufferedReader in = newBufferedReader(newInputStreamReader(conn.getInputStream()));
String inputLine;
while((inputLine = in.readLine()) !=null) {
//response.append(inputLine);
}
in.close();
returnresponseCode;
} catch(Exception ex) {
return-1;
}
}).subscribe(statusCode -> {
}, error -> {
}, () -> {
finishedLatch.countDown();
});
finishedLatch.await();
t = (System.nanoTime() - t) / 1000000;//ms
System.out.println("RxJavaWithBlocking TPS: "+ count *1000/ t);
}
運行結果如下:
RxJavaWithBlocking TPS: 29 。
@#¥%%……&!
性能怎么突降呢,第一個例子看起來性能超好啊,http server只增加了一個30毫秒的延遲,導致這個方法每秒只能處理29個請求。
如果我們估算一下, 29*30= 870 毫秒,大約1秒,正好和單個線程發送處理所有的請求的TPS差不多。
后面我們也會看到,實際的確是一個線程處理的,你可以在代碼中加入
加上調度器,不起作用?
如果你對 subscribeOn 和 observeOn 方法有些印象的話,可能會嘗試使用調度器去解決:
publicstaticvoidtestRxJavaWithBlocking(intcount)throwsException {
URL url = newURL("http://127.0.0.1:8999/");
CountDownLatch finishedLatch = newCountDownLatch(1);
longt = System.nanoTime();
Observable.range(0, count).map(i -> {
try{
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
intresponseCode = conn.getResponseCode();
BufferedReader in = newBufferedReader(newInputStreamReader(conn.getInputStream()));
String inputLine;
while((inputLine = in.readLine()) !=null) {
//response.append(inputLine);
}
in.close();
returnresponseCode;
} catch(Exception ex) {
return-1;
}
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(statusCode -> {
}, error -> {
}, () -> {
finishedLatch.countDown();
});
finishedLatch.await();
t = (System.nanoTime() - t) / 1000000;//ms
System.out.println("RxJavaWithBlocking TPS: "+ count *1000/ t);
}
加上 .subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()) 看一下性能:
RxJavaWithBlocking TPS: 30 。
性能沒有改觀,是時候了解一下RxJava線程調度的問題了。
RxJava的線程模型
首先,依照 Observable Contract , onNext 是順序執行的,不會同時由多個線程并發執行。
默認情況下,它是在調用 subscribe 方法的那個線程中執行的。如第一個例子和第二個例子,Rx的操作和消息接收處理都是在同一個線程中執行的。一旦由阻塞,比如第二個例子,久會導致這個線程被阻塞,吞吐量下降。
但是 subscribeOn 可以改變Observable的運行線程。
上圖中可以看到,如果你使用了 subscribeOn 方法,則Rx的運行將會切換到另外的線程上,而不是默認的調用線程。
需要注意的是,如果在Observable鏈中調用了多個 subscribeOn 方法,無論調用點在哪里,Observable鏈只會使用第一個 subscribeOn 指定的調度器,正所謂"一見傾情"。
但是 onNext 還是順序執行的,所以第二個例子的性能依然低下。
observeOn 可以中途改變Observable鏈的線程。前面說了, subscribeOn 方法改變的源Observable的整個的運行線程,要想中途切換線程,就需要 observeOn 方法。
官方的一個簡略晦澀的解釋如下:
The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.
一圖勝千言:
注意箭頭的顏色和橫軸的顏色,不同的顏色代表不同的線程。
Schedulers
上面我們了解了RxJava可以使用 subscribeOn 和 observeOn 可以改變和切換線程,以及 onNext 是順序執行的,不是并發執行,至多也就切換到另外一個線程,如果它中間的操作是阻塞的,久會影響整個Rx的執行。
Rx是通過調度器來選擇哪個線程執行的,RxJava內置了幾種調度器,分別為不同的case提供線程:
-
io(): 這個調度器時用于I/O操作, 它可以增長或縮減來確定線程池的大小它是使用CachedThreadScheduler來實現的。需要注意的是,它的線程池是無限制的,如果你使用了大量的線程的話,可能會導致OutOfMemory等資源用盡的異常。
-
computation(): 這個是計算工作默認的調度器,它與I/O操作無關。它也是許多RxJava方法的默認調度器:buffer(),debounce(),delay(),interval(),sample(),skip()。
因為這些方法內部已經調用的調度器,所以你再調用 subscribeOn 是無效的,比如下面的例子總是使用 computation 調度器的線程。
Observable.just(1,2,3)
.delay(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.newThread())
.map(i -> {
System.out.println("map: "+ Thread.currentThread().getName());
returni;
})
.subscribe(i -> {});
- immediate() :這個調度器允許你立即在當前線程執行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默認的調度器。
- newThread() :創建一個新的線程只從。
- trampoline() :為當前線程建立一個隊列,將當前任務加入到隊列中依次執行。
同時, Schedulers 還提供了 from 靜態方法,用戶可以定制線程池:
ExecutorService es = Executors.newFixedThreadPool(200,newThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
Schedulers.from(es)
改造,異步執行
現在,我們已經了解了RxJava的線程運行,以及相關的調度器。可以看到上面的例子還是順序阻塞執行的,即使是切換到另外的線程上,依然是順序阻塞執行,顯示它的吞吐率非常非常的低。下一步我們就要改造這個例子,讓它能異步的執行。
下面是一種改造方案,我先把代碼貼出來,再解釋:
publicstaticvoidtestRxJavaWithFlatMap(intcount)throwsException {
ExecutorService es = Executors.newFixedThreadPool(200,newThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
URL url = newURL("http://127.0.0.1:8999/");
CountDownLatch finishedLatch = newCountDownLatch(1);
longt = System.nanoTime();
Observable.range(0, count).subscribeOn(Schedulers.io()).flatMap(i -> {
//System.out.println("A: " + Thread.currentThread().getName());
returnObservable.just(i).subscribeOn(Schedulers.from(es)).map(v -> {
//System.out.println("B: " + Thread.currentThread().getName());
try{
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
intresponseCode = conn.getResponseCode();
BufferedReader in = newBufferedReader(newInputStreamReader(conn.getInputStream()));
String inputLine;
while((inputLine = in.readLine()) !=null) {
//response.append(inputLine);
}
in.close();
returnresponseCode;
} catch(Exception ex) {
return-1;
}
}
);
}
).observeOn(Schedulers.computation()).subscribe(statusCode -> {
//System.out.println("C: " + Thread.currentThread().getName());
}, error -> {
}, () -> {
finishedLatch.countDown();
});
finishedLatch.await();
t = (System.nanoTime() - t) / 1000000;//ms
System.out.println("RxJavaWithFlatMap TPS: "+ count *1000/ t);
es.shutdownNow();
}
通過 flatmap 可以將源Observable的元素項轉成n個Observable,生成的每個Observable可以使用線程池并發的執行,同時flatmap還會將這n個Observable merge成一個Observable。你可以將其中的注釋打開,看看線程的執行情況。
性能還不錯:
RxJavaWithFlatMap TPS: 3906 。
FlatMap— transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
另一種解決方案
我們已經清楚了要并行執行提高吞吐率的解決辦法就是創建多個Observable并且并發執行。基于這種解決方案,我們還可以有其它的解決方案。
上一方案中利用flatmap創建多個Observable,針對我們的例子,我們何不直接創建多個Observable呢?
publicstaticvoidtestRxJavaWithParallel(intcount)throwsException {
ExecutorService es = Executors.newFixedThreadPool(200,newThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
URL url = newURL("http://127.0.0.1:8999/");
CountDownLatch finishedLatch = newCountDownLatch(count);
longt = System.nanoTime();
for(intk =0; k < count; k++) {
Observable.just(k).map(i -> {
//System.out.println("A: " + Thread.currentThread().getName());
try{
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
intresponseCode = conn.getResponseCode();
BufferedReader in = newBufferedReader(newInputStreamReader(conn.getInputStream()));
String inputLine;
while((inputLine = in.readLine()) !=null) {
//response.append(inputLine);
}
in.close();
returnresponseCode;
} catch(Exception ex) {
return-1;
}
}).subscribeOn(Schedulers.from(es)).observeOn(Schedulers.computation()).subscribe(statusCode -> {
}, error -> {
}, () -> {
finishedLatch.countDown();
});
}
finishedLatch.await();
t = (System.nanoTime() - t) / 1000000;//ms
System.out.println("RxJavaWithParallel TPS: "+ count *1000/ t);
es.shutdownNow();
}
性能更好一點:
RxJavaWithParallel2 TPS: 4716 。
這個例子沒有使用 Schedulers.io() 作為它的調度器,這是因為如果在大并發的情況下,可能會出現創建過多的線程導致資源不錯,所以我們限定使用200個線程。
總結
- subscribeOn() 改變的Observable運行(operate)使用的調度器,多次調用無效。
- observeOn() 改變Observable發送notifications的調度器,會影響后續的操作,可以多次調用
- 默認情況下, 操作鏈使用的線程是調用 subscribe() 的線程
- Schedulers 提供了多個調度器,可以并行運行多個Observable
- 使用RxJava可以實現異步編程,但是依然要小心線程阻塞。而且由于這種異步的編程,調試代碼可能更加的困難
參考文檔
- http://reactivex.io/documentation/contract.html
- http://reactivex.io/documentation/operators/subscribeon.html 中文翻譯
- http://reactivex.io/documentation/operators/observeon.html 中文翻譯
- http://reactivex.io/documentation/scheduler.html
- http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html
- http://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html
- https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2 中文翻譯
- https://github.com/mcxiaoke/RxDocs
來自:http://colobu.com/2016/07/25/understanding-rxjava-thread-model/