RxJava2 常用方法介紹
之前看了 拉丁吳 關于 RxJava 的一些文章,獲益匪淺可惜的是大佬并沒有對于各個運算符進行介紹,關于 RxJava2 也只給了一個 GitHub 上的 Demo RxJava2-Android-Samples ,這個工程里面的注釋不多,只能自己一個一個去試了,這種小例子感覺還是直接用 Java 來的方便些,于是自己就寫了一個 RxJava2Deme
這篇文章就是對這個例子的一個一個操作進行了簡單的說明,僅此而已為了簡化下面的代碼,先定義了一些復用的函數,有點多,就不貼上來了,大概就是 Observer<Integer> getObserver(int n) 這樣的。 文件地址
操作符
concat()
例子:
Observable.concat(Observable.just(1, 2, 3), Observable.just(5, 6, 7))
.subscribe(getObserver(0));
輸出:
0 onSubscribe
0 onNext:1
0 onNext:2
0 onNext:3
0 onNext:5
0 onNext:6
0 onNext:7
0 onComplete
很簡單的連接操作
distinct()
例子:
Observable.just(0, 1, 1, 2, 3)
.distinct()
.subscribe(getObserver(0));
輸出:
0 onSubscribe
0 onNext:0
0 onNext:1
0 onNext:2
0 onNext:3
0 onComplete
分析:就是很簡單的去重
filter()
例子:
Observable.just(0, 1, 3, 4)
.filter(i -> i > 2)
.subscribe(getObserver(0));
輸出:
0 onSubscribe
0 onNext:3
0 onNext:4
0 onComplete
分析:篩選也沒啥說的,大家都會了
buffer()
例子:
Observable<Integer> observable = Observable.just(1, 2 ,3, 4, 5);
observable.buffer(3, 2).subscribe(getListObserver(0));
輸出:
0list onSubscribe
0list onNext:[1, 2, 3]
0list onNext:[3, 4, 5]
0list onNext:[5]
0list onComplete
解析:
buffer(count, skip)` 從定義就差不多能看出作用了,將 observable 中的數據按 skip(步長)分成最長不超過 count 的 buffer,然后生成一個 observable
debounce()
例子:
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
Thread.sleep(1000);
e.onNext(2);
Thread.sleep(400);
e.onNext(3);
Thread.sleep(1000);
e.onNext(4);
Thread.sleep(400);
e.onNext(5);
Thread.sleep(400);
e.onNext(6);
Thread.sleep(1000);
e.onComplete();
}).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.debounce(500, TimeUnit.MILLISECONDS)
.subscribe(getObserver(0));
Thread.sleep(10000);
輸出:
0 onSubscribe
0 onNext:1
0 onNext:3
0 onNext:6
0 onComplete
分析:
這么理解,在一段時間內只能有一次 onNext()
看上面的例子,我們將時間間隔設置為了 500ms
onNext(1) -> sleep(1000) (因為 1000 > 500) 所以 onNext(1) 成功執行 -> onNext(2) -> sleep(400) (因為 400 < 1000 所以 onNext(2) 被除掉了
這樣依次下去,需要注意的只要寫了 onNext() 時間就會被重置,即使這個 onNext() 被除掉了
在上面這個例子中 4 和 5 隔了 400ms,所以 4 沒有觸發,但是時間得重新計算
defer()
例子:
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(0, 1, 2, 3);
}
});
observable.subscribe(getObserver(0));
observable.subscribe(getObserver(1));
輸出:
0 onSubscribe
0 onNext:0
0 onNext:1
0 onNext:2
0 onNext:3
0 onComplete
1 onSubscribe
1 onNext:0
1 onNext:1
1 onNext:2
1 onNext:3
1 onComplete
分析:
就是在每次訂閱的時候就會創建一個新的 Observable
interval()
例子:
Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
p("accept " + aLong + " at " + System.currentTimeMillis());
}
});
輸出:
accept 0 at 1487498704028
accept 1 at 1487498706028
accept 2 at 1487498708028
accept 3 at 1487498710028
accept 4 at 1487498712028
...
分析:
類似于一個定時任務這樣吧,也可以像下面這么寫:
Observable.interval(3, 2, TimeUtil.SECONDS);
Observable.just(1, 2, 3).subscribeWith(getDisposableObserver(1));
last()
例子:
Observable.just(0, 1, 2).last(3).subscribe(getSingleObserver(0));
輸出:
0 onSubscribe
0 onSuccess :2
分析:
就是取出最后一個值,參數是沒有值的時候的默認值,比如這樣:
Observable.create((ObservableOnSubscribe<Integer>) Emitter::onComplete).last(3)
.subscribe(getSingleObserver(0));
輸出就是 3 了
另見 lastOrError() 方法,區別就是 lastOrError() 沒有默認值,沒值就觸發錯誤
map()
例子:
Observable.just(0, 1, 2, 3)
.map(i -> "string" + i)
.subscribe(getStringObserver(0));
輸出:
0 onSubscribe
0 onNext:string0
0 onNext:string1
0 onNext:string2
0 onNext:string3
0 onComplete
分析:
把一個 Observable 轉成另一個 Observable
###
例子:
Observable.merge(Observable.just(0, 1), Observable.just(3, 4))
.subscribe(getObserver(0));
輸出:
0 onSubscribe
0 onNext:0
0 onNext:1
0 onNext:3
0 onNext:4
0 onComplete
分析:
將多個 Observable 合起來,例子中只是兩個
參數也支持使用迭代器將更多的組合起來
###
例子:
Observable.just(1, 2, 3)
.reduce((i1, i2) -> i1 + i2)
.subscribe(getMaybeObserver(0));
輸出:
0 onSubscribe
0 onSuccess 6
分析:
就是依次用一個方法處理每個值,可以有一個 seed 作為初始值
不指定 seed 則第一次傳入的就是第一第二個
###
例子:
PublishSubject<Integer> publishSubject = PublishSubject.create();
Observable<Integer> observable = publishSubject.replay().autoConnect();
observable.subscribe(getObserver(0));
publishSubject.onNext(0);
observable.subscribe(getObserver(1));
publishSubject.onNext(1);
publishSubject.onComplete();
輸出:
0 onSubscribe
0 onNext:0
1 onSubscribe
1 onNext:0
0 onNext:1
1 onNext:1
0 onComplete
1 onComplete
分析:
從結果可以很明顯的看出來,使用了 replay() 后, subscribe(getObserver(1)) 之前的數據也被傳入了
相對于 cache() 來說, replay() 提供了更多可控制的選項,在實際使用中可以通過指定 bufferSize 來防止內存占用過大等
還有一個 ReplaySubject 功能和這個應該是差不多的,下面就不單獨說了,代碼在 GitHub 上都有。
merge()
例子:
Observable.just(0, 1, 2, 3)
.scan((i1, i2) -> i1 + i2)
.subscribe(getObserver(0));
輸出:
0 onSubscribe
0 onNext:0
0 onNext:1
0 onNext:3
0 onNext:6
0 onComplete
分析:
scan() 和上面提到的 reduce() 差不多
區別在于 reduce() 只輸出最終結果,而 scan() 會將過程中的每一個結果輸出
skip()
例子:
Observable.just(0, 1, 2, 3)
.skip(2)
.subscribe(getObserver(0));
輸出:
0 onSubscribe
0 onNext:2
0 onNext:3
0 onComplete
分析:
看名字也知道是干嘛的了,跳過一些數據,例子中跳過的是數據量,也可以跳過時間 skip(long time, TimeUnit unit)
take()
例子:
Observable.just(0, 1, 2, 3)
.skip(2)
.subscribe(getObserver(0));
輸出:
0 onSubscribe
0 onNext:0
0 onNext:1
0 onComplete
分析:
從數據中取前幾個出來
和 skip() 類似,參數可以指定為時間,那就是取出這段時間里的數據了
sample() throttleFirst() throttleLast()
這幾個是相關的,一起說
例子:
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(0);
Thread.sleep(200);
e.onNext(1);
Thread.sleep(600);
e.onNext(2);
Thread.sleep(300);
e.onNext(3);
Thread.sleep(1100);
e.onNext(4);
Thread.sleep(3000);
e.onComplete();
}).sample(1, TimeUnit.SECONDS) // throttleFirst 和 throttleLast 就將這里的 sample 改掉就行
.subscribe(getObserver(0));
輸出:
sample:
0 onSubscribe
0 onNext:2
0 onNext:3
0 onNext:4
0 onComplete
throttleFirst:
0 onSubscribe
0 onNext:0
0 onNext:3
0 onNext:4
0 onComplete
throttleLast:
0 onSubscribe
0 onNext:2
0 onNext:3
0 onComplete
分析:
先從 sample() 開始吧,先看下官網上的定義
emit the most recent items emitted by an Observable within periodic time intervals
發出最接近周期點的事件
在例子中,我們使用了 1000 作為 時間間隔,隨手畫了張圖,將就著看下
在 A 點之前沒有點,B 點之前最近的一個點是 2,C 點之前的是 3,所以輸出 2 和 3。
對于 throttleFirst() ,它和 sample() 是相反的,只是要注意一下,圖中的 0 點是被當做在 A 點之后處理的
而 throttleLast() 就是 sample() 換了個名字而已
timer()
例子:
Observable.timer(1, TimeUnit.SECONDS)
.subscribe(getLongObserver(0));
輸出:
0 onSubscribe
0 onNext:0
0 onComplete
分析:
定時任務了
window()
例子:
Observable.interval(1, TimeUnit.SECONDS)
.take(9)
.window(3, TimeUnit.SECONDS)
.subscribe(new Consumer<Observable<Long>>() {
int n = 0 ;
@Override
public void accept(Observable<Long> longObservable) throws Exception {
longObservable.subscribe(getLongObserver(n++));
}
});
輸出:
0 onSubscribe
0 onNext:0
0 onNext:1
0 onComplete
1 onSubscribe
1 onNext:2
1 onNext:3
1 onNext:4
1 onComplete
2 onSubscribe
2 onNext:5
2 onComplete
分析:
按照時間劃分窗口,將數據發送給不同的 observable 。
zip()
例子:
Observable.zip((ObservableSource<Integer>) observer -> {
observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
}, (ObservableSource<String>) observer -> {
observer.onNext("str");
observer.onNext("text");
observer.onComplete();
}, (integer, s) -> integer + s).subscribe(getStringObserver(0));
輸出:
0 onSubscribe
0 onNext:1str
0 onNext:2text
0 onComplete
分析:
功能是可以將不同的觀察者進行組合,并且 onNext() 是一對一對的,例子里的 3 就沒有對象了(3 真是慘 =.=)
然后只要有一個執行了 onComplete 就會結束掉
類
PublishSubject
例子:
publishSubject.subscribe(getObserver(0));
publishSubject.onNext(0);
publishSubject.subscribe(getObserver(1));
publishSubject.onNext(1);
publishSubject.onComplete();
輸出:
0 onSubscribe
0 onNext:0
1 onSubscribe
0 onNext:1
1 onNext:1
0 onComplete
1 onComplete
分析:
onNext() 會通知每個觀察者,僅此而已
AsyncSubject
例子:
AsyncSubject<Integer> subscriber = AsyncSubject.create();
subscriber.subscribe(getObserver(1));
subscriber.onNext(1);
subscriber.subscribe(getObserver(2));
subscriber.onNext(2);
subscriber.onComplete();
輸出:
Observer1onSubscribe
Observer2onSubscribe
Observer1onNext: 2
Observer1onComplete
Observer2onNext: 2
Observer2onCompelte
分析:
查看文檔,關于 AsyncObject 的介紹,在 調用 onComplete() 之前,除了 subscribe() 其它的操作都會被緩存,在調用 onComplete() 之后只有最后一個 onNext() 會生效
BehaviorObject
BehaviorSubject<Integer> subscriber = BehaviorSubject.create();
subscriber.subscribe(getObserver(1));
subscriber.onNext(0);
subscriber.onNext(1);
subscriber.subscribe(getObserver(2));
subscriber.onNext(4);
subscriber.onComplete();
輸出:
1 onSubscribe
1 onNext:0
1 onNext:1
2 onSubscribe
2 onNext:1
1 onNext:4
2 onNext:4
1 onComplete
2 onComplete
分析:
BehaviorSubject 的最后一次 onNext() 操作會被緩存,然后在 subscribe() 后立刻推給新注冊的 Observer
對于上面的例子 onNext(1) 這個操作會被緩存,在 subscribe(observer2) 之后會立刻傳入 onNext(1) 從而執行
Completable
先看下文檔里的說明:
Represents a deferred computation without any value but only indication for completion or exception. The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?
也就是說 Completable 是沒有 onNext 的,要么成功要么出錯,不關心過程,在 subscribe 后的某個時間點返回結果
例子:
Completablecompletable = Completable.timer(1000, TimeUnit.MILLISECONDS);
completable.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
p("onSubscribe " + System.currentTimeMillis());
}
@Override
public void onComplete() {
p("onComplete " + System.currentTimeMillis());
}
@Override
public void onError(Throwable e) {
p("onError" + e);
}
});
輸出:
onSubscribe 1487237357951
onComplete 1487237358952
Flowable
例子:
Flowable.just(0, 1, 2, 3)
.reduce(50, (a, b) -> a + b)
.subscribe(getSingleObserver(0));
輸出:
0 onSubscribe
0 onSuccess :56
分析:
Flowable 在 RxJava2 中是用來解決背壓問題的,至于具體的可以看前面推薦的文章,這里就不展開了。
CompositeDisposable
例子:
new CompositeDisposable().addAll(Observable.just(0, 1, 2, 3)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribeWith(getDisposableObserver(0)),
Observable.just(6, 7, 8, 9)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribeWith(getDisposableObserver(1)));
輸出:
0
6
1
7
java.lang.InterruptedException: sleepinterrupted
atjava....
java.lang.InterruptedException: sleepinterrupted
atjava...
分析:
就是一個 Disposable 的集合
就這么多了,應該常用的操作符都差不多有了。
隨便寫的,如果有什么地方沒說清楚歡迎留言,我會盡快回復。
順便求波贊和星,謝謝。
來自:https://blog.xujifa.cn/index.php/2017/02/20/rxjava2_introduction/