RxJava 2.0 全新來襲
前言
之前寫RxJava相關文章的時候,就有人想讓我談談RxJava2.0的新特性,說實話,一開始我是拒絕的。因為在我看來,RxJava2.0雖然是版本的重大升級,但總歸還是RxJava,升級一個版本還能上天是咋的?了解一下它的更新文檔不就好了么?真的有必要單出一篇文章來談這個么?
但是詳細的了解了RxJava2.0以及部分源碼之后,我覺得還是有必要對RxJava2.0做一個說明,幫助大家對于RxJava有更好的認識。
鋪墊
假如你還不是很熟悉RxJava,或者對于背壓這個概念(2.0更新中會涉及到背壓的概念)很模糊,希望你也能讀一讀下面兩篇鋪墊的文章:
- 關于RxJava最友好的文章
- 關于RxJava最友好的文章----背壓
關于背壓的那篇文章本來是本文的一部分,因為篇幅過大,被剝離出去了,所以建議大家有時間也一并閱讀。
正文
RxJava2.0有很多的更新,一些改動甚至沖擊了我之前的文章里的內容,這也是我想寫這篇文章的原因之一。不過想要寫這篇文章其實也是有難度的,因為相關的資料去其實是很少的,還得自己硬著頭皮上....不過俗話說得好,有困難要上,沒有困難創造困難也要上。
在這里,我會按照我們之前關于RxJava的文章的講述順序:觀察者模式,操作符,線程調度,這三個方面依次看有哪些更新。
添加依賴
這個估計得放在最前面。
Android端使用RxJava需要依賴新的包名:
//RxJava的依賴包(我使用的最新版本)
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
//RxAndroid的依賴包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
觀察者模式
首先聲明, RxJava以觀察者模式為骨架,在2.0中依然如此 。
不過此次更新中,出現了兩種觀察者模式:
- Observable(被觀察者)/Observer(觀察者)
- Flowable(被觀察者)/Subscriber(觀察者)
RxJava2.X中, Observeable用于訂閱Observer ,是不支持背壓的,而 Flowable用于訂閱Subscriber ,是支持背壓(Backpressure)的。
關于背壓這個概念以及它在1.0版本中的缺憾在上一篇文章中我已經介紹到了,如果你不是很清楚,我在這里在做一個介紹: 背壓是指在異步場景中,被觀察者發送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略 ,在1.0中,關于背壓最大的遺憾,就是集中在Observable這個類中,導致有的Observable支持背壓,有的不支持。為了解決這種缺憾,新版本把支持背壓和不支持背壓的Observable區分開來。
Observable/Observer
Observable正常用法:
Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
Observer mObserver=new Observer<Integer>() {
//這是新加入的方法,在訂閱后發送數據之前,
//回首先調用這個方法,而Disposable可用于取消訂閱
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mObservable.subscribe(mObserver);</code></pre>
這種觀察者模型是不支持背壓的。
啥叫不支持背壓呢?
當被觀察者快速發送大量數據時,下游不會做其他處理,即使數據大量堆積,調用鏈也不會報MissingBackpressureException,消耗內存過大只會OOM
我在測試的時候,快速發送了100000個整形數據,下游延遲接收,結果被觀察者的數據全部發送出去了,內存確實明顯增加了,遺憾的是沒有OOM。
所以,當我們使用Observable/Observer的時候,我們需要考慮的是,數據量是不是很大(官方給出以1000個事件為分界線,僅供各位參考)
Flowable/Subscriber
Flowable.range(0,10)
.subscribe(new Subscriber<Integer>() {
Subscription sub;
//當訂閱后,會首先調用這個方法,其實就相當于onStart(),
//傳入的Subscription s參數可以用于請求數據或者取消訂閱
@Override
public void onSubscribe(Subscription s) {
Log.w("TAG","onsubscribe start");
sub=s;
sub.request(1);
Log.w("TAG","onsubscribe end");
}
@Override
public void onNext(Integer o) {
Log.w("TAG","onNext--->"+o);
sub.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w("TAG","onComplete");
}
});</code></pre>
輸出如下:
onsubscribe start
onNext--->0
onNext--->1
onNext--->2
...
onNext--->10
onComplete
onsubscribe end
Flowable是支持背壓的,也就是說,一般而言,上游的被觀察者會響應下游觀察者的數據請求,下游調用request(n)來告訴上游發送多少個數據。這樣避免了大量數據堆積在調用鏈上,使內存一直處于較低水平。
當然,Flowable也可以通過creat()來創建:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}
//需要指定背壓策略
, BackpressureStrategy.BUFFER);
Flowable雖然可以通過create()來創建,但是你必須指定背壓的策略,以保證你創建的Flowable是支持背壓的(這個在1.0的時候就很難保證,可以說RxJava2.0收緊了create()的權限)。
根據上面的代碼的結果輸出中可以看到,當我們調用subscription.request(n)方法的時候,不等onSubscribe()中后面的代碼執行,就會立刻執行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的類時,應當盡量在subscription.request(n)這個方法調用之前做好初始化的工作;
當然,這也不是絕對的,我在測試的時候發現,通過create()自定義Flowable的時候,即使調用了subscription.request(n)方法,也會等onSubscribe()方法中后面的代碼都執行完之后,才開始調用onNext。
TIPS: 盡可能確保在request()之前已經完成了所有的初始化工作,否則就有空指針的風險。
其他觀察者模式
當然,除了上面這兩種觀察者,還有一類觀察者
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
其實這三者都差不多,Maybe/MaybeObserver可以說是前兩者的復合體,因此以Maybe/MaybeObserver為例簡單介紹一下這種觀察者模式的用法
//判斷是否登陸
Maybe.just(isLogin())
//可能涉及到IO操作,放在子線程
.subscribeOn(Schedulers.newThread())
//取回結果傳到主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
if(value){
...
}else{
...
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上面就是Maybe/MaybeObserver的普通用法,你可以看到,實際上,這種觀察者模式并不用于發送大量數據,而是發送單個數據,也就是說,當你只想要某個事件的結果(true or false)的時候,你可以用這種觀察者模式
這是上面那些被觀察者的上層接口:
//Observable接口
interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
其實我們可以看到,每一種觀察者都繼承自各自的接口,這也就把他們能完全的區分開,各自獨立(特別是Observable和Flowable),保證了他們各自的拓展或者配套的操作符不會相互影響。
例如flatMap操作符實現:
//Flowable中flatMap的定義
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
//Observable中flatMap的定義
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
假如你想為Flowable寫一個自定義的操作符,那么只要保證Function< Publisher >中的類型實現了Publisher接口即可。這么說可能很抽象,大家不理解其實也沒關系,因為并不推薦大家自定義操作符,RxJava中的操縱符的組合已經可以滿足大家的需求了。
當然,你也會注意到上面那些接口中的subscribe()方法的返回類型為void了,在1.X中,這個方法一般會返回一個Subscription對象,用于取消訂閱。現在,這個功能的對象已經被放到觀察者Observer或者subscriber的內部實現方法中了,
Flowable/Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
上面的實例中,onSubscribe(Subscription s)傳入的參數s就肩負著取消訂閱的功能,當然,他也可以用于請求上游的數據。
在Observable/observer中,傳入的參數是另一個對象
Observable/Observer
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
在Observer接口中,onSubscribe(Disposable d)方法傳入的Disposable也是用于取消訂閱,基本功能是差不多的,只不過命名不一致,大家知道就好。
其實這種設計可以說還是符合邏輯的,因為取消訂閱這個動作就只有觀察者(Observer等)才能做的,現在把它并入到觀察者內部,也算順理成章吧。
最后再提一點更新,就是被觀察者不再接收null作為數據源了。
操作符相關
這一塊其實可以說沒什么改動,大部分之前你用過的操作符都沒變,即使有所變動,也只是包名或類名的改動。大家可能經常用到的就是Action和Function。
Action相關
之前我在文章里介紹過關于Action這類接口,在1.0中,這類接口是從Action0,Action1...往后排(數字代表可接受的參數),現在做出了改動
Rx1.0-----------Rx2.0
Action1--------Action
Action1--------Consumer
Action2--------BiConsumer
后面的Action都去掉了,只保留了ActionN
Function相關
同上,也是命名方式的改變
上面那兩個類, 和RxJava1.0相比,他們都增加了throws Exception,也就是說,在這些方法做某些操作就不需要try-catch 。
例如:
Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
Files.readLines(name)這類io方法本來是需要try-catch的,現在直接拋出異常,就可以放心的使用lambda ,保證代碼的簡潔優美。
doOnCancel/doOnDispose/unsubscribeOn
以doOnCancel為例,大概就是當取消訂閱時,會調用這個方法,例如:
Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);
take新操符會取消后面那些還未被發送的事件,因而會觸發doOnCancel
其他的一些操作符基本沒變,或者只是改變了名字,在這里就不一一介紹了,需要提一下的是,很多操作符都有兩套,一套用于Observable,一套用于Flowable。
線程調度
可以說這一塊兒基本也沒有改動,如果一定要說的話。
- 那就是去掉了Schedulers.immediate()這個線程環境
- 移除的還有Schedulers.test()(我好像從來沒用過這個方法)
- io.reactivex.Scheduler這個抽象類支持直接調度自定義線程任務(這個我也沒怎么用)
補充
如果你想把自己的RxJava1.0的遷移到2.0的版本,可以使用這個庫RxJava2Interop ,在github上可以找到,它可以在Rxjava1.0和2.0之間相互轉換,也就是說,不僅可以把1.0的代碼遷移到2.0,你還可以把2.0的代碼遷移到1.0,哈哈。
結尾
其實從整篇文章的分析來看,改動最大的還是觀察者模式的實現,被拆分和細化了,主要分成了Observable和Flowable兩大類,當然還有與之相關聯的其他變動, 總體來看這一版本可以說是對于觀察者和被觀察者的重構 。
RxJava2.0的范例代碼我沒精力去寫了,也正巧有位外國朋友已經寫了RxJava2.0的demo,下面是他的項目地址:
在github上搜索:RxJava2-Android-Samples
當然,學習2.0 的過程中有什么問題也可以在這里留言討論。
附錄
下面我截圖展示一下2.0相對于1.0的一些改動的細節,僅做參考。








其實這些都是官方給出的列表,截圖在這里只是方便大家觀摩。
來自:http://www.jianshu.com/p/220955eefc1f