RxWeekend——RxJava周末狂歡
作者地址: MrFu Blog--RxWeekend
周五的時候就打算這個周末就看 RxJava 了,于是利用一個周末的時間把咖啡變成了文字,對,就是咖啡,不是啤酒和炸雞,周六把 RxJava Essentials 英文版再看了一遍,順便看了一遍翻譯版,周日把小鄧子的博客以及他引述的其他文章全部看了一遍。
Part1 部分主要是 RxJava Essentials 的操作符
Part2 部分主要是一些 tips
對于Part1我更建議你先去看 RxJava Essentials 這本書,再回過頭來看這部分。我這里的解釋可能是非常抽象的,都是一些總結性的解釋。這里有一個實例,和 Tips7 有關:RxFace,喜歡就 star,不要猶豫 ^^
Part 1: RxJava Essentials -- Operators
Basic
-
just()方法可以傳入1到9個參數,它們會按照傳入的參數的順序來發射它們。 -
Observable.empty()需要一個 Oservable 但是什么都不發射 -
Observable.never()傳一個不發射數據并永遠不會結束的 Observable -
Observable.throw()創建一個不發射數據并且以錯誤結束的 Observable -
repeat() -
defer()在觀察者訂閱時創建 Observable,而不是創建后立即執行,這篇文章有著更棒的解釋:小鄧子:使用RxJava實現延遲訂閱 -
range()從一個指定的數字開始發射 N 個數字 -
interval(3, TimeUnit.SECONDES)輪詢時用:參數:指定兩次發射時間間隔,時間單位。 -
timer()一段時間后才發射 Observable
Filtering
-
filter(),take(),takeLast() -
distinct()去掉序列中重復項,是作用于一個完整的序列的 -
distinctUntilChanged()在一個存在的序列上來創建一個新的不重復發射元素的序列
-
first(),last(),firstOrDefault(),lastOrDefault() -
skip(),skipLast()跳過前幾個或者最后幾個元素 -
elementAt()發射指定元素。但如果元素不足可以使用:elementAtOrDefault() -
sample(30,TimeUnit.SECONDS)指定的時間間隔里發射最近一次的數值
-
throttleFirst()定時發射第一個元素 -
timeout()限時,在指定時間間隔 Observable 不發射值的話, 就會觸發onError()函數 -
debounce()過濾發射速率過快的數據,即:在一個時間間隔過去之后,仍然沒有發射的話,則發射最后的那個
Transforming
-
map()接收到的對象應用到每個發射的值上 -
flatMap()將發射的序列轉換成另外一種對象的 Observable 序列,注意:它允許交叉,即flatMap()不保證最終生成的 Observable 和源 Observable 發射序列相同。 FlatMap -
concatMap()解決了flatMap()交叉的問題,提供了 能把發射值連續在一起的鋪平函數,而非合并它們。
關于
flatMap()和concatMap()必須看這篇文章: 小鄧子-RxJava變換操作符:.concatMap( )與.flatMap( )的比較
-
flatMapInterable()類似于flatMap()只是它將源數據兩兩結成對并生成 Iterable,而不是原始數據項和生成的 Observables -
switchMap()和flatMap()區別在于每當源 Observable 發射一個新的數據項時,將取消訂閱并停止監視之前那個數據項產生的 Observable,并開始監視當前發射的這個。 -
scan()累加器,對原始Observable 發射的每項數據都應用一個函數,計算出函數的結果值,并填充回可觀測序列,等待下一次發射的數據一起使用。 -
scan(R, Func2)用初始值作為第一個發射的值 -
groupBy()引用小鄧子的一段話來說是這樣的:去這里看更詳細的解釋,會恍然大悟的:小鄧子-Architecting Android with RxJava
將原始Observable根據不同的key分組成多個
GroupedObservable,由原始Observable發射(原始Observable的泛型將變成這樣Observable<GroupedObservable<K, T>>),每一個GroupedObservable既是事件本身也是一個獨立的Observable,每一個GroupedObservable發射一組原始Observable的事件子集。
-
buffer()將得到一個新的 Observable,這個 Observable 每次發射一組列表值而不是單個發射,你還可以指定它的 skip 值和 timespan 項數據 -
window()類似于buffer(),但它發射的是 Observable 而不是列表 -
cast()將源 Observable 中每一項數據都轉換成新的類型,轉成了一個不同的 Class。
Combining
-
merge()多個序列合并在一個最終發射的 Observable.mergeDelayError()當所有的 Observable 都完成時,再處理有 error 的情況,發射onError() -
zip()合并兩個或多個 Observables 發射出的數據項,根據指定的函數 Func* 變換它們,并發射一個新值 -
join()基于時間窗口將兩個 Observables 發射的數據結合在一起,組成一個新的 Observable。它可以控制每個 Observable 產生結果的生命周期,在每個結果的生命周期內,可以與另一個 Observable 產生的結果按照一定的規則進行合并!
join方法的用法如下:
observableA.join(observableB,
observableA產生結果生命周期控制函數,
observableB產生結果生命周期控制函數,
observableA產生的結果與observableB產生的結果的合并規則)
藍線和粉色的線表示對應的Observable 上的元素的生命周期。Android RxJava使用介紹(四) RxJava的操作符
combineLatest()像zip()的特殊形式,zip()作用于最近未打包的兩個 Observables,相反combineLatest()作用于最近發射的數據項
and(),then(),when(): 如下:
Pattern2<O1, O2> pattern = JoinObservable.from(obserable1).and(obserable2); Plan0<O1> plan = pattern.then(this::updateTitle); JoinObservable.when(plan).toObservable().observeOn(…).subscribe(…);
解釋:兩個發射序列 obserable1 和 obserable2 通過 and 鏈接。使用 pattern 對象創建 Plan 對象,然后使用 when...(好吧,我想不到使用場景...)
-
switch()將一個發射多個 Observables 的 Observable 轉換成另一個單獨的 Observable,后者發射那些 Observables 最近發射的數據項,注:當源 Observable 發射一個新的 Observable 時,switch()會立即取消訂閱前一個發射數據的 Observable,然后訂閱一個新的 Observable,并開始發射它的數據。 -
startWith()與concat()對應,通過傳一個參數來先發射一個數據序列
Part 2: Tips
Tips1
// Our sources (left as an exercise for the reader) Observable<Data> memory = ...; Observable<Data> disk = ...; Observable<Data> network = ...; // Retrieve the first source with data Observable<Data> source = Observable .concat(memory, disk, network) .first(); //先取 memory 中的數據,如果有,就取出,然后停止檢索隊列;沒有就取 disk 的數據,有就取出,然后停止檢索隊列;最后才是網絡請求
//持久化數據or緩存數據
Observable<Data> networkWithSave = network.doOnNext(new Action1<Data>() {
@Override public void call(Data data) {
saveToDisk(data);
cacheInMemory(data);
}
});
Observable<Data> diskWithCache = disk.doOnNext(new Action1<Data>() {
@Override public void call(Data data) {
cacheInMemory(data);
}
});
//現在,如果你使用 networkWithSave 和 diskWithCache,數據將會在加載后自動保存//處理陳舊數據
Observable<Data> source = Observable
.concat(memory, diskWithCache, networkWithSave)
.first(new Func1<Data, Boolean>() {
@Override public Boolean call(Data data) {
return data.isUpToDate();//需要 update 的話,則篩選掉該數據源,檢索下一個數據源
}
});//注:first() 和 takeFirst() 區別在于,如果沒有符合的數據源,first() 會拋 NoSuchElementException 異常 Tips2
-
.subsribeOn()操作符可以改變Observable應該在哪個調度器上執行任務。 -
.observeOn()操作符可以改變Observable將在哪個調度器上發送通知。 -
另外,默認情況下,鏈上的操作符將會在調用
.subsribeOn()的那個線程上執行任務。如下:
Observable.just(1,2,3) .subscribeOn(Schedulers.newThread()) .flatMap(/** 與UI線程無關的邏輯**//)//會在 subscribeOn() 指定的線程上執行任務 .observeOn(AndroidSchedulers.mainThread()) .subscribe();
Tips3
Backpressure(背壓): 事件產生的速度比消費快(在 producer-consumer(生產者-消費者) 模式中)。發生 overproucing 后,當鏈式結構不能承受數據壓力時,就會拋出 MissingBackpressureException 異常。
最常見的 Backpressure 就是連續快速點擊按鈕....
Tips4
再重用操作符的方式上,使用 compose(),而不是 flatMap():
Tips5
Schedulers:
將一個耗時的操作,通過 Scehdulers.io() 放到 I/O 線程中去處理
public static void storeBitmap(Context context, Bitmap bitmap, String filename){
Schedulers.io().createWorker().schedule(() -> {
blockingStoreBitmap(context, bitmap, filename);
})
} Tips6
-
subject可以同時是一個 Observable 也可以是一個 Observer,一個 Subject 可以訂閱一個 Observable,就像一個觀察者,并發射新數據,或者傳遞它接受到的數據,就像一個 Observable。see more -
對于空的 subscribe() 意為僅僅是為了開啟 Observable,而不用管已發出的值。
-
在
subscriber.onNext或subscriber.onCompleted()前檢測觀察者的訂閱情況,使代碼更高效,因為如果沒有觀察者等待時我們就不生成沒必要的數據項。就像這樣:
if (!subscriber.isUnsubscribed()){//避免生成不必要的數據項
return;
}
subscriber.onNext();
if (!subscriber.isUnsubscribed()){
subscriber.onCompleted();
} Tips7
我覺得這個 Tips 是最有用的
先祭出兩個工具類
對于 SchedulersCompat 類,我們的目的,是為了寫出這樣的代碼:
.compose(SchedulersCompat.<SomeEntity>applyExecutorSchedulers());
場景是這樣的:work thread 中處理數據,然后 UI thread 中處理結果。當然,我們知道是要使用 subscribeOn() 和 observeOn() 進行處理。最常見的場景是,調server 的 API 接口取數據的時候,那么,那么多接口,反復寫這兩個操作符是蛋疼的,為了避免這種情況,我們可以通過 compse() 操作符來實現復用,上面這段代碼就實現了這樣的功能。
SchedulersCompat 類中有這么一段 Schedulers.from(ExecutorManager.eventExecutor),哇喔,這里ExecutorManager 類里維護了一個線程池!目的呢!避免線程反復創建,實現線程復用!!!這樣,我就不需要每次都通過Schedulers.newThread()來實現了!!
如果你想了解更多,關于 compose()操作符,可以看這里:小鄧子-避免打斷鏈式結構:使用.compose( )操作符
對于這個 Tips, 我給出一個項目實例:RxFace,這是我在做一個人臉識別的 demo 的時候所寫的,用了 RxJava, retrofit, Okhttp。我在v1.1版本的時候增加通過compose()操作符復用 subscribeOn() 和 observeOn() 的邏輯。覺得還 OK 的話,可以點個 star 喔,哈哈
/**
* 這個類是 小鄧子 提供的!
*/
public class SchedulersCompat {
private static final Observable.Transformer computationTransformer =
new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
}
};
private static final Observable.Transformer ioTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
private static final Observable.Transformer newTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
private static final Observable.Transformer trampolineTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.trampoline())
.observeOn(AndroidSchedulers.mainThread());
}
};
private static final Observable.Transformer executorTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.from(ExecutorManager.eventExecutor))
.observeOn(AndroidSchedulers.mainThread());
}
};
/**
* Don't break the chain: use RxJava's compose() operator
*/
public static <T> Observable.Transformer<T, T> applyComputationSchedulers() {
return (Observable.Transformer<T, T>) computationTransformer;
}
public static <T> Observable.Transformer<T, T> applyIoSchedulers() {
return (Observable.Transformer<T, T>) ioTransformer;
}
public static <T> Observable.Transformer<T, T> applyNewSchedulers() {
return (Observable.Transformer<T, T>) newTransformer;
}
public static <T> Observable.Transformer<T, T> applyTrampolineSchedulers() {
return (Observable.Transformer<T, T>) trampolineTransformer;
}
public static <T> Observable.Transformer<T, T> applyExecutorSchedulers() {
return (Observable.Transformer<T, T>) executorTransformer;
}
}/**
* 這個類也是 小鄧子 提供的!!
*/
public class ExecutorManager {
public static final int DEVICE_INFO_UNKNOWN = 0;
public static ExecutorService eventExecutor;
//private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CPU_COUNT = ExecutorManager.getCountOfCPU();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
private static final BlockingQueue<Runnable> eventPoolWaitQueue = new LinkedBlockingQueue<>(128);
private static final ThreadFactory eventThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(@NonNull Runnable r) {
return new Thread(r, "eventAsyncAndBackground #" + mCount.getAndIncrement());
}
};
private static final RejectedExecutionHandler eventHandler =
new ThreadPoolExecutor.CallerRunsPolicy();
static {
eventExecutor =
new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS,
eventPoolWaitQueue, eventThreadFactory, eventHandler);
}
/**
* Linux中的設備都是以文件的形式存在,CPU也不例外,因此CPU的文件個數就等價與核數。
* Android的CPU 設備文件位于/sys/devices/system/cpu/目錄,文件名的的格式為cpu\d+。
*
* 引用:http://www.jianshu.com/p/f7add443cd32#,感謝 liangfeizc :)
* https://github.com/非死book/device-year-class
*/
public static int getCountOfCPU() {
if (Build.VERSION.SDK_INT <= Build.VERSION_CODES.GINGERBREAD_MR1) {
return 1;
}
int count;
try {
count = new File("/sys/devices/system/cpu/").listFiles(CPU_FILTER).length;
} catch (SecurityException | NullPointerException e) {
count = DEVICE_INFO_UNKNOWN;
}
return count;
}
private static final FileFilter CPU_FILTER = new FileFilter() {
@Override public boolean accept(File pathname) {
String path = pathname.getName();
if (path.startsWith("cpu")) {
for (int i = 3; i < path.length(); i++) {
if (path.charAt(i) < '0' || path.charAt(i) > '9') {
return false;
}
}
return true;
}
return false;
}
};
}