ReactiveX框架(基于RxJava)實現原理淺析

AudryXHPV 8年前發布 | 16K 次閱讀 RxJava 安卓開發 Java開發

來自: http://blog.kifile.com/reactivex/2015/12/07/rx_introduce.html

前言

先簡單介紹一下 ReactiveX.

ReactiveX 并不特指某種編程語言,他應該算是一種編程思維,反應式編程.

反應式編程的核心在于,當觸發特定行為邏輯后(對于ReactiveX而言,就是調用了 subscribe 指令),根據進行指定操作,并根據操作結果執行特定操作. 這種編程思維特別適合用于交互式軟件上,例如Android,iOS,通常用戶觸發某個條件(比如說點擊操作)后,我們需要根據用戶的操作行為, 可能接下來要執行一系列操作,最后再根據操作結果在ui界面上呈現給用戶.而ReactiveX 為我們提供了這種交互流程的封裝.

下面以RxJava為例,分析一下ReactiveX的框架實現原理.

RxJava執行流程

首先奉上一個最簡單的 ReactiveX 的代碼實現.

 1 Observable.create(new Observable.OnSubscribe<String>() {
 2     @Override
 3     public void call(Subscriber<? super String> subscriber) {
 4         subscriber.onNext("Sample");
 5         subscriber.onCompleted();
 6     }
 7 }).subscribe(new Observer<String>() {
 8     @Override
 9     public void onCompleted() {
10         System.out.println("Complete");
11     }
12 
13     @Override
14     public void onError(Throwable e) {
15         e.printStackTrace();
16     }
17 
18     @Override
19     public void onNext(String s) {
20         System.out.println("Get:" + s);
21     }
22 });

運行上面的代碼,我們可以看到以下輸出.

Get:Sample
Complete

代碼執行完畢,讓我們看看整個流程的實現邏輯.

 1 private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
 2     ...
 3     subscriber.onStart();
 4     ...
 5     try {
 6         hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
 7         return hook.onSubscribeReturn(subscriber);
 8     } catch (Throwable e) {
 9         Exceptions.throwIfFatal(e);
10         try {
11             subscriber.onError(hook.onSubscribeError(e));
12         } catch (Throwable e2) {
13             Exceptions.throwIfFatal(e2);
14             // if this happens it means the onError itself failed (perhaps an invalid function implementation)
15             // so we are unable to propagate the error correctly and will just throw
16             RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
17             // TODO could the hook be the cause of the error in the on error handling.
18             hook.onSubscribeError(r);
19             // TODO why aren't we throwing the hook's return value.
20             throw r;
21         }
22         return Subscriptions.unsubscribed();
23     }
24 }

在上面的代碼里,會發現一個 hook 對象,這是個什么鬼?

追蹤一下,發現原來他是一個RxJava每個方法都會返回一個Observable對象ExecutionHook對象,類圖如下:

可以看出,RxJavaObservableExecutionHook中針對RxJava的subscribe流程進行注入,方便自己更改相關邏輯, 當然對于默認的RxJavaObservableExecutionHook,你會發現他并沒有做任何處理,如果你想自己實現可以調用 RxJavaPlugin.getInstance() 設置自定義Hook.

看完上面的例子,給人感覺挺簡單地啊,而且比較類似Android中的AsyncTask,都是屬于執行任務后進行回調,那他相比AsyncTask有什么優勢嗎?

Operator

雖然從上面的例子中,看起來RxJava的確其貌不揚,但是ReactiveX也不止這點技法.

為了擴展ReactiveX的相關屬性,在 RxJava 中使用代理模式實現了很多有用的邏輯,例如類型轉換,遍歷數據個數限制,定時響應等額外特性.

這些邏輯被稱為操作(Operator),每一個Operator都繼承了Func1(也就是內部會有一個call()方法),RxJava框架會調用lift方法將Operator包裝成為Observable:

 1 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
 2     return new Observable<R>(new OnSubscribe<R>() {
 3         @Override
 4         public void call(Subscriber<? super R> o) {
 5             try {
 6                 Subscriber<? super T> st = hook.onLift(operator).call(o);
 7                 try {
 8                     // new Subscriber created and being subscribed with so 'onStart' it
 9                     st.onStart();
10                     onSubscribe.call(st);
11                 } catch (Throwable e) {
12                     // localized capture of errors rather than it skipping all operators
13                     // and ending up in the try/catch of the subscribe method which then
14                     // prevents onErrorResumeNext and other similar approaches to error handling
15                     Exceptions.throwIfFatal(e);
16                     st.onError(e);
17                 }
18             } catch (Throwable e) {
19                 Exceptions.throwIfFatal(e);
20                 // if the lift function failed all we can do is pass the error to the final Subscriber
21                 // as we don't have the operator available to us
22                 o.onError(e);
23             }
24         }
25     });
26 }

如果想了解更多詳細的操作信息,可以點擊這里: Operators

其實這里有個問題,既然大部分的Operator都需要這樣封裝,為什么不直接讓Operator對象繼承OnSubscribe對象,進而減少方法調用層級?

如果是為了防止OnSubscribe和Func1的方法重名,那么更改函數名就好了啊? 如果為了進行onLift回調,也可以在新類中增加回調調用位置啊?

同時為了避免代碼冗余,對于這些方法,RxJava都使用了構造者模式的一種變體,每個方法都會返回一個Observable對象,保證其能夠形成類似下面這樣的操作鏈.

 1 Observable.just("Hello", "Operator", "Chain").map(s -> s + " map" )
 2     .buffer(2).take(1).subscribe(new Subscriber<List<String>>() {
 3         @Override
 4         public void onCompleted() {
 5             System.out.println("Complete");
 6         }
 7 
 8         @Override
 9         public void onError(Throwable e) {
10 
11         }
12 
13         @Override
14         public void onNext(List<String> strings) {
15             System.out.println("Get:" + strings);
16         }
17 });

輸出結果如下:

Get:[Hello map, Operator map]
Complete

可以看到,我們輸入了三個String對象,但是只一次輸入了兩個String,并且兩個String 都額外多了一個 map 后綴,當然這也是我想要的結果

使用Single替代Observable

其實在絕大部分的使用場景中,用戶觸發操作后,對于我們而言返回結果其實只有成功失敗兩種,而Observable中是有 onNext , onComplete , onError 三種狀態,這樣看來似乎不太滿足需求.

當然ReactiveX中也考慮到了這種情況,它在Observable的基礎上衍生出了Single類,這個類的實現機制同Observable近乎相同,只是不過訂閱他的不再是 Subscriber 對象,而是 SingleSubscriber .

在 SingleSubscriber 中有 onSuccess 和 onError 兩種結果狀態,并且只會調用其中一個,恰好滿足我們的需求.

使用Scheduler進行多線程調用

按照上面所說,不論是Observable還是Single,其實都是在同一個線程中,不斷按照執行邏輯執行代碼指令,也就是說他始終是在同一線程中進行執行的.

但是有時候,我們希望能夠在異步線程中執行耗時操作,避免ui堵塞,這時候ReactiveX就為我們提供了 Scheduler 類.

Scheduler類其實并不負責異步線程處理,他只負責通過 createWorker() 類創建出一個 Worker 對象,真正負責任務的延時處理.

每一個Scheduler類,都會實現自己的Worker類,用于執行Scheduler任務.

我們可以使用ReactiveX中的 subscribeOn 和 observeOn 兩個方法,分別設置獲取數據的操作和分發消息的操作的執行Scheduler,從而實現數據的異步處理.

subscribeOn 和 observeOn 其實都是構建一個Operator對象,在call方法里,使用線程執行數據獲取和分發操作.

總結

其實ReactiveX就是一個針對觀察者模式的擴展,如果忽略掉ReactiveX框架為我們實現各種的Operator,那么它就是一個簡單的設計模式而已.

單就這一點而言,Android的AsyncTask和LoaderManager框架是勝過ReactiveX的,因為他針對Android的生命周期做了處理.

但由于ReactiveX中增加了很多Operator,能夠很方便的幫助我們對響應式任務進行操作,不論是類型轉換還是異步執行.

反而導致看起來ReactiveX中的比AsyncTask要好用,但我覺得如果吧ReactiveX結合到AsyncTask和LoaderManager中應該會更加的完美.

</code></code></code></code></code></code></div>

 本文由用戶 AudryXHPV 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!