當鋼鐵俠反應更靈敏-RxJava
本系列的這個部分主要是講一些函數式技巧能給我們的項目帶來的好處。
ReactiveX 中的 RxJava
是一個可以幫助我們輕松處理不同運行環境下的后臺線程或UI線程任務的框架。這在 Android 上一直是我們所有人的噩夢。
這篇文章主要會談談其中(RxJava)的一些 operators 如何能在常見開發任務中為我們節省時間, Reactive Extensions 提供了很多種類的 operators 來讓我們用的更方便。
像以前一樣,大部分代碼和片段都已經上傳到了 Github, 請隨意評論、提 issue 或吐槽!
在本系列 第一部分 中我們介紹了 Dagger 2 ,現在我們更進一步會看到如何降低各層代碼邏輯之間的耦合、增加可擴展性。
RetroLambda
有時,在 Java 開發的大型應用程序中,或 Android 這樣的大型框架中,使用 Java 8 中的 Lambda 表達式這類特性是非常困難或是幾乎不可能的(Android中)。
Retrolambda 就是來解決這個問題的,它會把 Java 8 的字節碼翻譯成低版本 Java 像v7甚至v5、v6的字節碼,這樣可以讓我們在這些低版本中使用到 Lambda 表達式的特性。
Retrolambda 可以通過 Gradle 或 Maven 的方式來使用,我選擇 Gradle 是因為 Android Studio 對其支持的很好。要使用它你只需要將 [Retrolambda](https://github.com/orfjackal/retrolambda)
插件加到項目根目錄的build.gradle
, 同時在 module 的 build 腳本中應用它,再設置 Android Studio 的語言級別到 1.8
,這就完成了。
build.gradle (root)
dependencies {
classpath 'me.tatarka:gradle-retrolambda:3.1.0'
}
{your module}/build.gradle
apply plugin: 'me.tatarka.retrolambda'
android {
...
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
Retrolambda 可以讓你少寫很多重復的代碼,同時理順我們的代碼讓它有更好的可讀性。在 Dan Lew 給出的這個例子中,你可以感受到它所帶來的不同。
沒有 Retrolambda
Observable.just("Hello, world!")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
有 Retrolambda
Observable.just("Hello, world!") .subscribe(
s -> System.out.println(s)
);
在我們的 Avengers 示例中
mCharacterSubscription = mGetCharacterInformationUsecase
.execute().subscribe(
character -> onAvengerReceived(character),
error -> manageError(error)
);
mComicsSubscription = mGetCharacterComicsUsecase
.execute().subscribe(
comics -> Observable.from(comics).subscribe(
comic -> onComicReceived(comic)),
error -> manageError(throwable)
);
ReactiveX
ReactiveX 是一個開源項目的集合,它們所遵循的主要原則都是 Observer 模式、Iterator 模式以及函數式編程。
ReactiveX 同時也提供了可用于異步編程的API,事實上使用這些框架來實現異步任務非常簡單。
ReactiveX 的異步客戶端用法
在使用 ReactiveX 時最棒的是你可以創建一個完全異步的API或客戶端,在實現具體邏輯的時候再決定是把代碼寫成異步的、放到線程池中的一個獨立線程執行、還是同步來執行。
因此我們可以得到一個觀察者模式的API而不是一個阻塞調用的API。
public interface Usecase<T> {
Observable<T> execute();
}
public interface Repository {
Observable<Character> getCharacter (final int characterId);
Observable<List<Comic>> getCharacterComics (final int characterId);
}
RxJava是什么
RxJava 是一個由 Netflix 開發的 Reactive Extensions 的(Java版)實現。其他還有大量主流編程語言的實現,包括 Javascript, Python, Ruby, Go 等等。
Observables 和 Observers
一個 Observable
可以輸出一個或一系列的 objects,這些 objects 會被訂閱到這個Observable
的 Observer
所處理或接收。
把一個 Observer
注冊到一個 Observable
上是很必要的,如果不這么做的話Observable
什么都不會輸出。當 Observer
注冊之后,一個 Subscription
類型的實例會創建,它可以用來取消對 Observable
的訂閱,這通常在 Activities
和Fragments
的 onStop
或 onPause
方法中非常有用,例如:
mCharacterSubscription = mGetCharacterInformationUsecase
.execute().subscribe( ... );
@Override
public void onStop() {
if (!mCharacterSubscription.isUnsubscribed())
mCharacterSubscription.unsubscribe();
if (!mComicsSubscription.isUnsubscribed())
mComicsSubscription.unsubscribe();
}
無論何時 Observer
訂閱 Observable
的消息,它都需要考慮處理3個方法:
– onNext (T)
方法用來接收 Observable
發出的 objects.
– onError (Exception)
,這個方法會在內部拋出異常的時候調用。
– onCompleted()
,這個方法會在 Observable
停止釋放 objects 的時候調用。
我喜歡這張圖
組件間通信
讓我們來看看如何使用 GetCharacterInformationUsecase
這個用例,所有的用例都實現了 Usecase <T>
接口。
public interface Usecase<T> {
Observable<T> execute();
}
這個例子被調用的時候會返回一個 Observable
類型的實例,它可以輕松的和其他的 observables 和 operators 組合成鏈式結構,我們很快將會看到這些 operators 的強大威力。
當我們調用 GetCharacterInformationUsecase
的時候請求我們的倉庫產生一個對應類型的數據源:
@Override
public Observable<Character> execute() {
return mRepository.getCharacter(mCharacterId);
// .awesomeRxStuff();
}
AvengerDetailPresenter
這個 presenter 將會成為我們這個用例的 Observer
,它將會訂閱這個 Observable
發出的所有事件,這個操作可以通過調用subscribe
方法來完成,這樣就把 Observer
和 Observable
關聯在一起了。
實現 onNext
和 onError
方法可以來處理操作結果。 onCompleted
方法并沒有實現,因為在這個例子中不需要。
mCharacterSubscription = mGetCharacterInformationUsecase
.execute().subscribe(
character -> onAvengerReceived(character),
error -> manageError(error));
Retrofit 和 RxJava
Square 出品了 Retrofit , RxJava 支持其中的 rx.Observable
方法,這樣網絡請求的結果就可以通過 Observer
的方式來訂閱、修改或通過 operators 加工。
你一定得十分清楚在什么地方來調用它, Retrofit 的請求會在 Observable
所在線程上來執行,因此當你在 UI 線程(Activity 或 Fragment 中)來調用的話就會報錯。接下來我們講講 Schedulers
!
Schedulers
Schedulers 可以讓你在多線程中使用 operators
和 Observables
。它可以被用在不同的線程、一個 線程 Executor 或是預設的 [Schedulers](http://reactivex.io/documentation/scheduler.html)
中。例如,對于輸入或輸出這類操作會在 Schedulers.io ()
來執行。
RxAndroid 是由 Jake Wharton 和 Matthias K?ppler 開發的一些 Android 下專用的 RxJava 的工具,其中包括一些用來處理 Android 平臺下多進程調用的Schedulers
。
它也提供了使用 Android 的 Handler
來處理并發的方式。
@Override
public Observable<Character> execute() {
return mRepository.getCharacter(mCharacterId)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
這個例子演示了 Rx 提供的處理 Android 中多進程調用的方式,這真是非常的炫酷 ??
Operators
ReactiveX 最厲害的要數它的 operators
了,它們可以用來操作、變換、合并Observables
輸出的 objecs。
我們來看看一個漫畫列表的例子,漫畫都有一個特定的出版年份,我們想要展示特定年份出版的漫畫,這時 ReactiveX 就能大顯身手了!
這個過濾過程是通過 filter
這個 operator 來完成的,它可以作為漫畫的一個約束條件來加以判斷。在這個過程中,詢問用戶要過濾出哪一年,然后使用這個年份來判定一個漫畫是否允許被展示。
public Observable<Comic> filterByYear(String year) {
if (mComics != null) {
return Observable.from(mComics).filter(
comic -> {
for (ComicDate comicDate : comic.getDates())
if (comicDate.getDate().startsWith(year))
return true;
return false;
});
}
return null;
}
異常處理
另一個很好的 Rx 的 operators 能幫我們節省時間提升效率的例子是異常處理的 operators。
設想一個用戶要請求網絡,但是在網絡通道上發生了一些偶然因素,網絡連接在這種情況下受到了影響。
當我們接收到了 Retrofit 拋出的 SocketTimeoutException
異常時,我們可以利用 retry 這個 operator 來處理。
retry 可以接收一個判定條件,就像之前我們在 filter 中所做的一樣,如果返回 true,那么 Rx 就會神奇的再次調用 Observable
重新執行 Retrofit 的網絡請求。
如果最多拋出了3次 SocketTimeoutExceptions
異常,程序會繼續執行后續的onError
來處理異常。
@Override
public Observable<List<Comic>> getCharacterComics(int characterId) {
final String comicsFormat = "comic";
final String comicsType = "comic";
return mMarvelApi.getCharacterComics(
characterId, comicsFormat, comicsType)
.retry((attemps, error) ->
error instanceof SocketTimeoutException &&
attemps < MAX_ATTEMPS);
}
一些參考資料
- ReactiveX
- Reactive Programming in the Netflix API with RxJava
- Use Lambdas on Java 7 and older – Esko Luontola
- Grokking RxJava , Part1: The basics – Dan Lew