謎之RxJava (二) —— Magic Lift
來自: http://segmentfault.com/a/1190000004049841
回顧
上一篇文章 講了Observable
、OnSubscribe
和Subscriber
之間的關系。 我們知道,Observable
的具體工作都是在OnSubscribe
中完成的。從這個類名我們也知道,如果生成了一個Observable
對象,而不進行subscribe
,那么什么都不會發生!
OK,RxJava
最讓人興奮的就是它有各種各樣的操作符,什么map
呀,flatMap
呀各種,我們今天要知其然知其所以然
,那么他們是如何實現功能的呢?
例子
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); } }) .map(new Func1<String, String>() { @Override public String call(String s) { return s + "word"; } }) .subscribe(new Subscriber<String>() { @Override public void onCompleted() {} @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.d("rx", s); }
});</pre>
lift
我們先看下進行鏈式調用
map
之后,發生了什么。public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }對,就是調用了
lift
函數!,然后把我們的轉換器(Transfomer,我好想翻譯成變形金剛)傳入進去,看下它做了什么事。public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } st.onError(e); } } catch (Throwable e) { if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }); }來,我來簡化一下
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(...); }返回了一個新的
Observable
對象,這才是重點! 這種鏈式調用看起來特別熟悉?有沒有像javascript
中的Promise/A
,在then
中返回一個Promise
對象進行鏈式調用?OK,那么我們要看下它是如何工作的啦。
在
map()
調用之后,我們操作的就是新的Observable
對象,我們可以把它取名為Observable$2
,OK,我們這里調用subscribe
,完整的就是Observable$2.subscribe
,繼續看到subscribe
里,重要的幾個調用:hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber);注意注意 ! 這里的
</blockquote>observable
是Observable$2
!!也就是說,這里的onSubscribe
是,lift
中定義的!!OK,我們追蹤下去,回到
lift
的定義中。return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); //請注意我!! 這個onSubscribe是原始的OnSubScribe對象!! } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } st.onError(e); } } catch (Throwable e) { if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } });一定一定要注意這段函數執行的上下文!,這段函數中的
onSubscribe
對象指向的是外部類,也就是第一個Observable
的onSubScribe
!而不是Observable$2
中的onSubscribe
,OK,謹記這一點之后,看看Subscriber<? super T> st = hook.onLift(operator).call(o);這行代碼,就是定義
operator
,生成一個經過operator
操作過的Subscriber
,看下OperatorMap
這個類中的call
方法@Override public Subscriber<? super T> call(final Subscriber<? super R> o) { return new Subscriber<T>(o) {@Override public void onCompleted() { o.onCompleted(); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onNext(T t) { try { o.onNext(transformer.call(t)); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(OnErrorThrowable.addValueAsLastCause(e, t)); } } };
}</pre>
沒錯,對傳入的
Subscriber
做了一個代理,把轉換后的值傳入。
這樣就生成了一個代理的Subscriber
,最后我們最外層的
OnSubscribe
對象對我們代理的Subscriber
進行了調用。。
也就是@Override public void call(Subscriber<? super String> subscriber) { //此處的subscriber就是被map包裹(wrapper)后的對象。 subscriber.onNext("hello"); }然后這個
subscriber
傳入到內部,鏈式的通知,最后通知到我們在subscribe
函數中定義的對象。這時候要盜下扔物線大大文章的圖
還不明白的各位,可以自己寫一個Demo試一下。
下一章講下
RxJava
中很重要的線程切換。