謎之RxJava (二) —— Magic Lift
來自: http://segmentfault.com/a/1190000004049841
回顧
上一篇文章 講了Observable、OnSubscribe和Subscriber之間的關系。 我們知道,Observable的具體工作都是在OnSubscribe中完成的。從這個類名我們也知道,如果生成了一個Observable對象,而不進行subscribe,那么什么都不會發生!
OK,RxJava最讓人興奮的就是它有各種各樣的操作符,什么map呀,flatMap呀各種,我們今天要知其然知其所以然,那么他們是如何實現功能的呢?
例子
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "word";
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d("rx", s);
}
});</pre>
lift
我們先看下進行鏈式調用map之后,發生了什么。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
} 對,就是調用了lift函數!,然后把我們的轉換器(Transfomer,我好想翻譯成變形金剛)傳入進去,看下它做了什么事。
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
} 來,我來簡化一下
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(...);
} 返回了一個新的Observable對象,這才是重點! 這種鏈式調用看起來特別熟悉?有沒有像javascript中的Promise/A,在then中返回一個Promise對象進行鏈式調用?
OK,那么我們要看下它是如何工作的啦。
在map()調用之后,我們操作的就是新的Observable對象,我們可以把它取名為Observable$2,OK,我們這里調用subscribe,完整的就是Observable$2.subscribe,繼續看到subscribe里,重要的幾個調用:
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
注意注意 ! 這里的observable是Observable$2!!也就是說,這里的onSubscribe是,lift中定義的!!
</blockquote>
OK,我們追蹤下去,回到lift的定義中。
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st); //請注意我!! 這個onSubscribe是原始的OnSubScribe對象!!
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}); 一定一定要注意這段函數執行的上下文!,這段函數中的onSubscribe對象指向的是外部類,也就是第一個Observable的onSubScribe!而不是Observable$2中的onSubscribe,OK,謹記這一點之后,看看
Subscriber<? super T> st = hook.onLift(operator).call(o);
這行代碼,就是定義operator,生成一個經過operator操作過的Subscriber,看下OperatorMap這個類中的call方法
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}</pre>
沒錯,對傳入的Subscriber做了一個代理,把轉換后的值傳入。
這樣就生成了一個代理的Subscriber,
最后我們最外層的OnSubscribe對象對我們代理的Subscriber進行了調用。。
也就是
@Override
public void call(Subscriber<? super String> subscriber) {
//此處的subscriber就是被map包裹(wrapper)后的對象。
subscriber.onNext("hello");
} 然后這個subscriber傳入到內部,鏈式的通知,最后通知到我們在subscribe函數中定義的對象。
這時候要盜下扔物線大大文章的圖

還不明白的各位,可以自己寫一個Demo試一下。
下一章講下RxJava中很重要的線程切換。