打造RxJava生命周期管理框架RxLife
詳解Subject
什么是Subject
在RxJava當中,有四個對象值得我們關注:Observable,Subject,Observer,Subscriber,它們之間的關系如下:
對于Observable,Observer,Subscriber我們比較熟悉,故不做說明,重點來看Subject。
通過上面的圖我們可以看出Subject繼承自Observable,也就意味著Subject可以作為被觀察者,另外,它又實現了Observer接口,這意味著它也可以作為觀察者。不難看出,Subject既能作為Observer訂閱Observable,又能作為Observable被其他Observer訂閱。總之,Subject承擔了這么一種角色:對上作為觀察者,對下作為被觀察者。
和Observable必須有訂閱者才能發射數據不一樣,無論Subject是否有訂閱者,它都可以發射數據。這有點類似廣播電臺,不會因為我們關閉收音機就停止廣播,在收聽的人自然收聽的到,沒收聽的人也無關緊要。
常見的Subject
從上面的uml中我們看出,RxJava為我們提供了四種常用的Subject,即
AsyncSubject,BehabviorSubject,PublishSubject,ReplaySubject,下面我們對這四者進行說明:
AsyncSubject
AsyncSubject會緩存最后一個數據并在調用 onCompleted() 時將該數據發送給訂閱者,原理如下:
在該過程中,一旦發生任何異常都不會發送數據到訂閱者,而是發送給訂閱者一個異常通知,即訂閱者只能接受到一個異常的通知,如下:
舉例來說明AsyncSubject的用法:
asyncSubject.onNext("1");
asyncSubject.onNect("2");
asyncSubject.onCompleted();//必須調用才會開始發送數據
以上代碼執行后,訂閱者接受到的數據是2.
BehaviorSubject
當BehaviorSubject被訂閱后,它首先會發送原始Observable最近發射的數據,如果最近沒有,會發射一個默認值,接下繼續發射原始Observable的數據,如下圖:
如果原始的Observable因為發生了錯誤而終止,那么BehaviorSubject在發送一個錯誤通知后不再發射數據,如下:
我們舉例來說明BehabviorSubject的用法:
behaviorSubject.onNext("1");
behaviorSubject.onNect("2");
behaviorSubject.onNext("3");
behaviorSubject.subscribe(new Action<String>(){
@Override
public void call(String s){
System.out.println(“result:”+s);
}
});
behaviorSubject.onNext("4");
輸出結果是3,4.
PublishSubject
默認情況下,RxJava中的Observable一旦被訂閱就開始發送事件,這和我們傳統的觀察者模式有所區別。而PublishSuject的行為則類似傳統的觀察這模式,觀察者可以先訂閱被觀察者,然后在某個時刻手動調用方法來發射數據(訂閱之后的數據)到所有的觀察者。如下圖:
如果原始的Observable因為發生了錯誤而終止,那么PublishSubject在發送一個錯誤通知后不再發射數據,如下:
舉例來說明PublishSubject的用法:
publishSubject.onNext("1");
publishSubject.onNect("2");
publishSubject.onNext("3");//訂閱之前不會被發送
publishSubject.subscribe(new Action<String>(){
@Override
public void call(String s){
System.out.println(“result:”+s);
}
});
publishSubject.onNect("4");
publishSubject.onNect("5");</code></pre>
1,2,3是在訂閱之前的數據,不會被發射,最終輸出結果是4,5。
ReplaySubject
ReplaySubject會緩存所有已經發射的數據,當一個新的訂閱關系產生時,ReplaySuject會將所有數據都發送給他。另外,ReplaySubject支持設置緩存數據和緩存時間。如下圖:

舉例來說明ReplaySubject的用法:
replaySubject.onNext("1");
replaySubject.onNect("2");
replaySubject.onNext("3");
replaySubject.subscribe(new Action<String>(){
@Override
public void call(String s){
System.out.println(“result:”+s);
}
});
replaySubject.onNect("4");</code></pre>
默認情況下ReplaySubject會緩存所有的數據,因此最終數據的結果如下:
result:1
result:2
result:3
result:4
小結
回顧上面所談的,不難看出不同的Subject最大的區別在于發送數據的行為不同,簡單概括如下:
Subject
發射行為
AsyncSubject
不論訂閱發生在什么時候,只會發射最后一個數據
BehaviorSubject
發送訂閱之前一個數據和訂閱之后的全部數據
ReplaySubject
不論訂閱發生在什么時候,都發射全部數據
PublishSubject
發送訂閱之后全部數據
關于Subject更詳細的使用方法請直接查閱api doc.
實現生命周期管理框架(RxLife)
在了解Subject之后就可以開始考慮如何實現一個生命周期管理框架。每當Activity或者Fragment的生命周期發生變化時我們都希望產生一個對應的事件來通知當前所有的訂閱者,這樣我們就可以根據對應的事件去確定是否取消訂閱關系了。
從上面的描述中,我們有兩個問題要解決:
- 如何監聽Activity或Fragmeng生命周期變化并將其發送出去。
- 原有的觀察者如何接受生命周期,并在某生命周期下中斷原有的Observable。
通過以上兩個問題,我們知道我們需要一個既能夠發射生命周期,又能接受生命周期的觀察者,因此不難想到這里需要Subject。生命周期是連續產生的,無論是否有訂閱者,我們只關注最最近的生命周期,因此我們選擇使用BehaviorSubject。
現在我們來考慮如何監聽Activity或Fragment的生命周期,并利用BehaviorSubject發射生命周期。這里我們以Activity為例進行說明。
生命周期事件監聽
定義生命周期事件
我們根據Activity的生命周期,定義相應的事件。
public enum ActivityEvent {
CREATE,
RESUME,
START,
PAUSE,
STOP,
DESTORY
}
監聽生命周期
為了能在Activitiy生命周期變化時發送相應的事件,我們定義了RxAppcompatActivity,該類繼承了AppCompatActivity并重寫器生命周期方法:在不同方法中發射事件到BehaviorSubject中。這就好像我們的BehaviorSubject對象在不斷的觀察Activity生命周期的變化。當然,由于Subject的特性,BehaviorSubject也具備了將這些事件發射出去的能力。
public class RxAppCompatActivity extends AppCompatActivity {
protected final BehaviorSubject<ActivityEvent> lifeSubject = BehaviorSubject.create();
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifeSubject.onNext(ActivityEvent.CREATE);
}
@Override
protected void onResume() {
super.onResume();
lifeSubject.onNext(ActivityEvent.RESUME);
}
@Override
protected void onStart() {
super.onStart();
lifeSubject.onNext(ActivityEvent.START);
}
@Override
protected void onPause() {
super.onPause();
lifeSubject.onNext(ActivityEvent.PAUSE);
}
@Override
protected void onStop() {
super.onStop();
lifeSubject.onNext(ActivityEvent.STOP);
}
@Override
protected void onDestroy() {
super.onDestroy();
lifeSubject.onNext(ActivityEvent.DESTORY);
}
}</code></pre>
Observable自動停止發射數據
到現在我們已經利用Subject來監視生命周期的變化,那又如何讓原有的Observable(比如網絡請求的Observable)來監視Subject發射的數據呢,并根據Subject的狀態自動停止原始數據的發射?換言之就是一個Observable如何在發射數據的同時監視另一個Observable?
TakeUtil操作符
令人高興的是,RxJava中提供的TakeUntil操作符來實現上述需求。TakeUntil訂閱原始的Observable并發射數據,此外它還監視你提供的第二個Observable。當第二個Observable發射了一項數據或者發射一項終止的通知時(onError通知或者onCompleted通知),TakeUntil返回的Observable會停止發射原始的Observable,如下圖所示:

我們用一個簡單的例子來展示TakeUntil操作符的使用:
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.d("MainActivity", "num:" + num);
}
});
上面的代碼每隔2s進行輸出,現在我們希望5s后自動停止輸出,就可以這樣做:
Observable.interval(2, TimeUnit.SECONDS).takeUntil(Observable.timer(5,TimeUnit.SECONDS)).subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.d("MainActivity", "num:" + num);
}
});
為了讓以上代碼更通用,我們利用compose操作符進行改寫(對compose不熟悉的童鞋自行查閱資料):
private void startIntervalTask1() {
Observable.interval(2, TimeUnit.SECONDS).compose(bindUntilDelay(5)).subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.d("MainActivity", "num:" + num);
}
});
}
@NonNull
private Observable.Transformer<Long, Long> bindUntilDelay(final int delaySecond) {
return new Observable.Transformer<Long, Long>() {
@Override
public Observable<Long> call(Observable<Long> longObservable) {
return longObservable.takeUntil(timer(delaySecond,TimeUnit.SECONDS));
}
};
}</code></pre>
回到正題,現在我們已經有了可以發射生命周期事件的BehaviorSubject,再結合TakeUntil不就可以實現在指定生命周期發生時自動停止原有的Observable了嗎?
結合BehaviorSubject與TakeUntil
有了上面的知識做鋪墊,實現生命周期管理框架也就顯得輕而易舉了。
為了方便使用,我們在RxAppcompatActivity中提供了bindUntilEvent(ActivityEvent nindEvent)方法:
public class RxAppCompatActivity extends AppCompatActivity {
protected final BehaviorSubject<ActivityEvent> lifeSubject = BehaviorSubject.create();
public <T> Observable.Transformer<T, T> bindUntilEvent(final ActivityEvent bindEvent) {
//被監視的Observable
final Observable<ActivityEvent> observable = lifeSubject.takeFirst(new Func1<ActivityEvent, Boolean>() {
@Override
public Boolean call(ActivityEvent event) {
return event.equals(bindEvent);
}
});
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> sourceOb) {
return sourceOb.takeUntil(observable);
}
};
}
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifeSubject.onNext(ActivityEvent.CREATE);
}
@Override
protected void onResume() {
super.onResume();
lifeSubject.onNext(ActivityEvent.RESUME);
}
@Override
protected void onStart() {
super.onStart();
lifeSubject.onNext(ActivityEvent.START);
}
@Override
protected void onPause() {
super.onPause();
lifeSubject.onNext(ActivityEvent.PAUSE);
}
@Override
protected void onStop() {
super.onStop();
lifeSubject.onNext(ActivityEvent.STOP);
}
@Override
protected void onDestroy() {
super.onDestroy();
lifeSubject.onNext(ActivityEvent.DESTORY);
}
}
接下來,我們用同樣的方式來處理Fragment或者其他組件即可。
具體使用
新建的Activity需要繼承我們的RxAppcompatActivity,新建的Fragment則繼承我們的RxFragment,就是這么簡單。
我們同樣還是以 師父說 為例,由于我們的方法基本和RxLifeCycle保持一致,因此只要簡單的改動就可以讓RxLife工作起來,現在就可以用RxLife來代替RxLifeCycle。
仍然做個簡單的示例:
ApiFactory.getWXApi().getWXHot(AppConstant.KEY_WX, getPageSize(), mCurrentPage + 1).compose(this.bindUntilEvent(FragmentEvent.STOP))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mSubscriber);
總結
通過自行實現一個RxJava生命周期管理框架(RxLife)加深le我們對RxJava中Subject的理解。另外,Subject的應用非常廣泛,在下篇文章中,我們將會進一步深入,利用Subject來打造自己的事件通信總線RxBus。
來自:http://blog.csdn.net/dd864140130/article/details/53029617