ReactiveX框架(基于RxJava)實現原理淺析
來自: http://blog.kifile.com/reactivex/2015/12/07/rx_introduce.html
前言
先簡單介紹一下 ReactiveX.
ReactiveX 并不特指某種編程語言,他應該算是一種編程思維,反應式編程.
反應式編程的核心在于,當觸發特定行為邏輯后(對于ReactiveX而言,就是調用了 subscribe 指令),根據進行指定操作,并根據操作結果執行特定操作. 這種編程思維特別適合用于交互式軟件上,例如Android,iOS,通常用戶觸發某個條件(比如說點擊操作)后,我們需要根據用戶的操作行為, 可能接下來要執行一系列操作,最后再根據操作結果在ui界面上呈現給用戶.而ReactiveX 為我們提供了這種交互流程的封裝.
下面以RxJava為例,分析一下ReactiveX的框架實現原理.
RxJava執行流程
首先奉上一個最簡單的 ReactiveX 的代碼實現.
1 Observable.create(new Observable.OnSubscribe<String>() {
2 @Override
3 public void call(Subscriber<? super String> subscriber) {
4 subscriber.onNext("Sample");
5 subscriber.onCompleted();
6 }
7 }).subscribe(new Observer<String>() {
8 @Override
9 public void onCompleted() {
10 System.out.println("Complete");
11 }
12
13 @Override
14 public void onError(Throwable e) {
15 e.printStackTrace();
16 }
17
18 @Override
19 public void onNext(String s) {
20 System.out.println("Get:" + s);
21 }
22 });
運行上面的代碼,我們可以看到以下輸出.
Get:Sample
Complete
代碼執行完畢,讓我們看看整個流程的實現邏輯.
1 private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
2 ...
3 subscriber.onStart();
4 ...
5 try {
6 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
7 return hook.onSubscribeReturn(subscriber);
8 } catch (Throwable e) {
9 Exceptions.throwIfFatal(e);
10 try {
11 subscriber.onError(hook.onSubscribeError(e));
12 } catch (Throwable e2) {
13 Exceptions.throwIfFatal(e2);
14 // if this happens it means the onError itself failed (perhaps an invalid function implementation)
15 // so we are unable to propagate the error correctly and will just throw
16 RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
17 // TODO could the hook be the cause of the error in the on error handling.
18 hook.onSubscribeError(r);
19 // TODO why aren't we throwing the hook's return value.
20 throw r;
21 }
22 return Subscriptions.unsubscribed();
23 }
24 }
在上面的代碼里,會發現一個 hook 對象,這是個什么鬼?
追蹤一下,發現原來他是一個RxJava每個方法都會返回一個Observable對象ExecutionHook對象,類圖如下:
可以看出,RxJavaObservableExecutionHook中針對RxJava的subscribe流程進行注入,方便自己更改相關邏輯, 當然對于默認的RxJavaObservableExecutionHook,你會發現他并沒有做任何處理,如果你想自己實現可以調用 RxJavaPlugin.getInstance() 設置自定義Hook.
看完上面的例子,給人感覺挺簡單地啊,而且比較類似Android中的AsyncTask,都是屬于執行任務后進行回調,那他相比AsyncTask有什么優勢嗎?
Operator
雖然從上面的例子中,看起來RxJava的確其貌不揚,但是ReactiveX也不止這點技法.
為了擴展ReactiveX的相關屬性,在 RxJava 中使用代理模式實現了很多有用的邏輯,例如類型轉換,遍歷數據個數限制,定時響應等額外特性.
這些邏輯被稱為操作(Operator),每一個Operator都繼承了Func1(也就是內部會有一個call()方法),RxJava框架會調用lift方法將Operator包裝成為Observable:
1 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
2 return new Observable<R>(new OnSubscribe<R>() {
3 @Override
4 public void call(Subscriber<? super R> o) {
5 try {
6 Subscriber<? super T> st = hook.onLift(operator).call(o);
7 try {
8 // new Subscriber created and being subscribed with so 'onStart' it
9 st.onStart();
10 onSubscribe.call(st);
11 } catch (Throwable e) {
12 // localized capture of errors rather than it skipping all operators
13 // and ending up in the try/catch of the subscribe method which then
14 // prevents onErrorResumeNext and other similar approaches to error handling
15 Exceptions.throwIfFatal(e);
16 st.onError(e);
17 }
18 } catch (Throwable e) {
19 Exceptions.throwIfFatal(e);
20 // if the lift function failed all we can do is pass the error to the final Subscriber
21 // as we don't have the operator available to us
22 o.onError(e);
23 }
24 }
25 });
26 }
如果想了解更多詳細的操作信息,可以點擊這里: Operators
其實這里有個問題,既然大部分的Operator都需要這樣封裝,為什么不直接讓Operator對象繼承OnSubscribe對象,進而減少方法調用層級?
如果是為了防止OnSubscribe和Func1的方法重名,那么更改函數名就好了啊? 如果為了進行onLift回調,也可以在新類中增加回調調用位置啊?
同時為了避免代碼冗余,對于這些方法,RxJava都使用了構造者模式的一種變體,每個方法都會返回一個Observable對象,保證其能夠形成類似下面這樣的操作鏈.
1 Observable.just("Hello", "Operator", "Chain").map(s -> s + " map" )
2 .buffer(2).take(1).subscribe(new Subscriber<List<String>>() {
3 @Override
4 public void onCompleted() {
5 System.out.println("Complete");
6 }
7
8 @Override
9 public void onError(Throwable e) {
10
11 }
12
13 @Override
14 public void onNext(List<String> strings) {
15 System.out.println("Get:" + strings);
16 }
17 });
輸出結果如下:
Get:[Hello map, Operator map]
Complete
可以看到,我們輸入了三個String對象,但是只一次輸入了兩個String,并且兩個String 都額外多了一個 map 后綴,當然這也是我想要的結果
使用Single替代Observable
其實在絕大部分的使用場景中,用戶觸發操作后,對于我們而言返回結果其實只有成功失敗兩種,而Observable中是有 onNext , onComplete , onError 三種狀態,這樣看來似乎不太滿足需求.
當然ReactiveX中也考慮到了這種情況,它在Observable的基礎上衍生出了Single類,這個類的實現機制同Observable近乎相同,只是不過訂閱他的不再是 Subscriber 對象,而是 SingleSubscriber .
在 SingleSubscriber 中有 onSuccess 和 onError 兩種結果狀態,并且只會調用其中一個,恰好滿足我們的需求.
使用Scheduler進行多線程調用
按照上面所說,不論是Observable還是Single,其實都是在同一個線程中,不斷按照執行邏輯執行代碼指令,也就是說他始終是在同一線程中進行執行的.
但是有時候,我們希望能夠在異步線程中執行耗時操作,避免ui堵塞,這時候ReactiveX就為我們提供了 Scheduler 類.
Scheduler類其實并不負責異步線程處理,他只負責通過 createWorker() 類創建出一個 Worker 對象,真正負責任務的延時處理.
每一個Scheduler類,都會實現自己的Worker類,用于執行Scheduler任務.
我們可以使用ReactiveX中的 subscribeOn 和 observeOn 兩個方法,分別設置獲取數據的操作和分發消息的操作的執行Scheduler,從而實現數據的異步處理.
subscribeOn 和 observeOn 其實都是構建一個Operator對象,在call方法里,使用線程執行數據獲取和分發操作.
總結
其實ReactiveX就是一個針對觀察者模式的擴展,如果忽略掉ReactiveX框架為我們實現各種的Operator,那么它就是一個簡單的設計模式而已.
單就這一點而言,Android的AsyncTask和LoaderManager框架是勝過ReactiveX的,因為他針對Android的生命周期做了處理.
但由于ReactiveX中增加了很多Operator,能夠很方便的幫助我們對響應式任務進行操作,不論是類型轉換還是異步執行.
反而導致看起來ReactiveX中的比AsyncTask要好用,但我覺得如果吧ReactiveX結合到AsyncTask和LoaderManager中應該會更加的完美.
</code></code></code></code></code></code></div>