RxJava 2.0 全新來襲

804696063 8年前發布 | 190K 次閱讀 RxJava

前言

之前寫RxJava相關文章的時候,就有人想讓我談談RxJava2.0的新特性,說實話,一開始我是拒絕的。因為在我看來,RxJava2.0雖然是版本的重大升級,但總歸還是RxJava,升級一個版本還能上天是咋的?了解一下它的更新文檔不就好了么?真的有必要單出一篇文章來談這個么?

但是詳細的了解了RxJava2.0以及部分源碼之后,我覺得還是有必要對RxJava2.0做一個說明,幫助大家對于RxJava有更好的認識。

鋪墊

假如你還不是很熟悉RxJava,或者對于背壓這個概念(2.0更新中會涉及到背壓的概念)很模糊,希望你也能讀一讀下面兩篇鋪墊的文章:

  • 關于RxJava最友好的文章
  • 關于RxJava最友好的文章----背壓

關于背壓的那篇文章本來是本文的一部分,因為篇幅過大,被剝離出去了,所以建議大家有時間也一并閱讀。

正文

RxJava2.0有很多的更新,一些改動甚至沖擊了我之前的文章里的內容,這也是我想寫這篇文章的原因之一。不過想要寫這篇文章其實也是有難度的,因為相關的資料去其實是很少的,還得自己硬著頭皮上....不過俗話說得好,有困難要上,沒有困難創造困難也要上。

在這里,我會按照我們之前關于RxJava的文章的講述順序:觀察者模式,操作符,線程調度,這三個方面依次看有哪些更新。

添加依賴

這個估計得放在最前面。

Android端使用RxJava需要依賴新的包名:

//RxJava的依賴包(我使用的最新版本)
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    //RxAndroid的依賴包
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

觀察者模式

首先聲明, RxJava以觀察者模式為骨架,在2.0中依然如此

不過此次更新中,出現了兩種觀察者模式:

  • Observable(被觀察者)/Observer(觀察者)
  • Flowable(被觀察者)/Subscriber(觀察者)

RxJava2.X中, Observeable用于訂閱Observer ,是不支持背壓的,而 Flowable用于訂閱Subscriber ,是支持背壓(Backpressure)的。

關于背壓這個概念以及它在1.0版本中的缺憾在上一篇文章中我已經介紹到了,如果你不是很清楚,我在這里在做一個介紹: 背壓是指在異步場景中,被觀察者發送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略 ,在1.0中,關于背壓最大的遺憾,就是集中在Observable這個類中,導致有的Observable支持背壓,有的不支持。為了解決這種缺憾,新版本把支持背壓和不支持背壓的Observable區分開來。

Observable/Observer

Observable正常用法:

Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });

Observer mObserver=new Observer<Integer>() { //這是新加入的方法,在訂閱后發送數據之前, //回首先調用這個方法,而Disposable可用于取消訂閱 @Override public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Integer value) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };

mObservable.subscribe(mObserver);</code></pre>

這種觀察者模型是不支持背壓的。

啥叫不支持背壓呢?

當被觀察者快速發送大量數據時,下游不會做其他處理,即使數據大量堆積,調用鏈也不會報MissingBackpressureException,消耗內存過大只會OOM

我在測試的時候,快速發送了100000個整形數據,下游延遲接收,結果被觀察者的數據全部發送出去了,內存確實明顯增加了,遺憾的是沒有OOM。

所以,當我們使用Observable/Observer的時候,我們需要考慮的是,數據量是不是很大(官方給出以1000個事件為分界線,僅供各位參考)

Flowable/Subscriber

Flowable.range(0,10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            //當訂閱后,會首先調用這個方法,其實就相當于onStart(),
            //傳入的Subscription s參數可以用于請求數據或者取消訂閱
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG","onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG","onsubscribe end");
            }

        @Override
        public void onNext(Integer o) {
            Log.w("TAG","onNext--->"+o);
            sub.request(1);
        }
        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }
        @Override
        public void onComplete() {
            Log.w("TAG","onComplete");
        }
    });</code></pre> 

輸出如下:

onsubscribe start
onNext--->0
onNext--->1
onNext--->2
...
onNext--->10
onComplete
onsubscribe end

Flowable是支持背壓的,也就是說,一般而言,上游的被觀察者會響應下游觀察者的數據請求,下游調用request(n)來告訴上游發送多少個數據。這樣避免了大量數據堆積在調用鏈上,使內存一直處于較低水平。

當然,Flowable也可以通過creat()來創建:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        }
        //需要指定背壓策略
        , BackpressureStrategy.BUFFER);

Flowable雖然可以通過create()來創建,但是你必須指定背壓的策略,以保證你創建的Flowable是支持背壓的(這個在1.0的時候就很難保證,可以說RxJava2.0收緊了create()的權限)。

根據上面的代碼的結果輸出中可以看到,當我們調用subscription.request(n)方法的時候,不等onSubscribe()中后面的代碼執行,就會立刻執行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的類時,應當盡量在subscription.request(n)這個方法調用之前做好初始化的工作;

當然,這也不是絕對的,我在測試的時候發現,通過create()自定義Flowable的時候,即使調用了subscription.request(n)方法,也會等onSubscribe()方法中后面的代碼都執行完之后,才開始調用onNext。

TIPS: 盡可能確保在request()之前已經完成了所有的初始化工作,否則就有空指針的風險。

其他觀察者模式

當然,除了上面這兩種觀察者,還有一類觀察者

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

其實這三者都差不多,Maybe/MaybeObserver可以說是前兩者的復合體,因此以Maybe/MaybeObserver為例簡單介紹一下這種觀察者模式的用法

//判斷是否登陸
Maybe.just(isLogin())
    //可能涉及到IO操作,放在子線程
    .subscribeOn(Schedulers.newThread())
    //取回結果傳到主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{
                    ...
                }
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

上面就是Maybe/MaybeObserver的普通用法,你可以看到,實際上,這種觀察者模式并不用于發送大量數據,而是發送單個數據,也就是說,當你只想要某個事件的結果(true or false)的時候,你可以用這種觀察者模式

這是上面那些被觀察者的上層接口:

//Observable接口
interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
    void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
    void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

其實我們可以看到,每一種觀察者都繼承自各自的接口,這也就把他們能完全的區分開,各自獨立(特別是Observable和Flowable),保證了他們各自的拓展或者配套的操作符不會相互影響。

例如flatMap操作符實現:

//Flowable中flatMap的定義
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

//Observable中flatMap的定義
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

假如你想為Flowable寫一個自定義的操作符,那么只要保證Function< Publisher >中的類型實現了Publisher接口即可。這么說可能很抽象,大家不理解其實也沒關系,因為并不推薦大家自定義操作符,RxJava中的操縱符的組合已經可以滿足大家的需求了。

當然,你也會注意到上面那些接口中的subscribe()方法的返回類型為void了,在1.X中,這個方法一般會返回一個Subscription對象,用于取消訂閱。現在,這個功能的對象已經被放到觀察者Observer或者subscriber的內部實現方法中了,

Flowable/Subscriber

public interface Subscriber<T> {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

上面的實例中,onSubscribe(Subscription s)傳入的參數s就肩負著取消訂閱的功能,當然,他也可以用于請求上游的數據。

在Observable/observer中,傳入的參數是另一個對象

Observable/Observer

public interface Observer<T> {
   void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();
    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

在Observer接口中,onSubscribe(Disposable d)方法傳入的Disposable也是用于取消訂閱,基本功能是差不多的,只不過命名不一致,大家知道就好。

其實這種設計可以說還是符合邏輯的,因為取消訂閱這個動作就只有觀察者(Observer等)才能做的,現在把它并入到觀察者內部,也算順理成章吧。

最后再提一點更新,就是被觀察者不再接收null作為數據源了。

操作符相關

這一塊其實可以說沒什么改動,大部分之前你用過的操作符都沒變,即使有所變動,也只是包名或類名的改動。大家可能經常用到的就是Action和Function。

Action相關

之前我在文章里介紹過關于Action這類接口,在1.0中,這類接口是從Action0,Action1...往后排(數字代表可接受的參數),現在做出了改動

Rx1.0-----------Rx2.0

Action1--------Action

Action1--------Consumer

Action2--------BiConsumer

后面的Action都去掉了,只保留了ActionN

Function相關

同上,也是命名方式的改變

上面那兩個類, 和RxJava1.0相比,他們都增加了throws Exception,也就是說,在這些方法做某些操作就不需要try-catch 。

例如:

Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);

Files.readLines(name)這類io方法本來是需要try-catch的,現在直接拋出異常,就可以放心的使用lambda ,保證代碼的簡潔優美。

doOnCancel/doOnDispose/unsubscribeOn

以doOnCancel為例,大概就是當取消訂閱時,會調用這個方法,例如:

Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);

take新操符會取消后面那些還未被發送的事件,因而會觸發doOnCancel

其他的一些操作符基本沒變,或者只是改變了名字,在這里就不一一介紹了,需要提一下的是,很多操作符都有兩套,一套用于Observable,一套用于Flowable。

線程調度

可以說這一塊兒基本也沒有改動,如果一定要說的話。

  • 那就是去掉了Schedulers.immediate()這個線程環境
  • 移除的還有Schedulers.test()(我好像從來沒用過這個方法)
  • io.reactivex.Scheduler這個抽象類支持直接調度自定義線程任務(這個我也沒怎么用)

補充

如果你想把自己的RxJava1.0的遷移到2.0的版本,可以使用這個庫RxJava2Interop ,在github上可以找到,它可以在Rxjava1.0和2.0之間相互轉換,也就是說,不僅可以把1.0的代碼遷移到2.0,你還可以把2.0的代碼遷移到1.0,哈哈。

結尾

其實從整篇文章的分析來看,改動最大的還是觀察者模式的實現,被拆分和細化了,主要分成了Observable和Flowable兩大類,當然還有與之相關聯的其他變動, 總體來看這一版本可以說是對于觀察者和被觀察者的重構

RxJava2.0的范例代碼我沒精力去寫了,也正巧有位外國朋友已經寫了RxJava2.0的demo,下面是他的項目地址:

在github上搜索:RxJava2-Android-Samples

當然,學習2.0 的過程中有什么問題也可以在這里留言討論。

附錄

下面我截圖展示一下2.0相對于1.0的一些改動的細節,僅做參考。

其實這些都是官方給出的列表,截圖在這里只是方便大家觀摩。

 

來自:http://www.jianshu.com/p/220955eefc1f

 

 本文由用戶 804696063 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!