自己動手實現 RxJava 理解其調用鏈
RxJava 擁有繁多的 API 和復雜的邏輯鏈,學習復雜的知識,一般從整體再到具體,為了學習 RxJava 的原理,參考其源碼,自己動手實現一個簡化的 RxJava,旨在理解調用鏈
閱讀本文,建議先下載代碼 LittleRx ,畢竟在IDE里閱讀代碼比在網頁上要清晰得多,也可以看下打印的日志
最主要的4個類:Observable、OnSubscribe、Operator、Subscriber
1、最簡單的,創建一個Observable,然后訂閱
Observable
.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
});
public class Observable<T> {
private OnSubscribe<T> onSubscribe;
private Observable(OnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}
public final void subscribe(Subscriber<? super T> subscriber) {
onSubscribe.call(subscriber);
}
public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {
return new Observable<>(onSubscribe);
}
}
這里可以看出 subscribe(subscriber)-->onSubscribe.call(subscriber),所以沒有訂閱動作就不會觸發 OnSubscribe.call()
2、map 和 lift
Observable
.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
}
})
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "map" + integer;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}
});
非鏈式的寫法
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = Observable.create(onSubscribe);
Func1<Integer, String> func = new Func1<>();
Observable<String> observable2 = observable.map(func);
Subscriber<String> subscriber = new Subscriber<>();
observable2.subscribe(subscriber);
create() 跟之前一樣,那么 map() 做了什么
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
lift() 根據上一個 observable 的 onSubscribe 創建一個新的 OnSubscribeLift 返回一個新的 observable2,上面我們說過 subscribe(subscriber)-->onSubscribe.call(subscriber),所以我們接著看 OnSubscribeLift
public class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
// 先不用關心具體實現,下面講到再看
@Override
public void call(Subscriber<? super R> r) {
Subscriber<? super T> st = operator.call(r); // 這個 operator 就是 OperatorMap
parent.call(st); // parent 就是第一個 observable 的 onSubscribe
}
}
再看下 OperatorMap
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
// 先不用關心具體實現,下面講到再看
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new MapSubscriber<T, R>(o, transformer);
}
private class MapSubscriber<T, R> extends Subscriber<T> {
private Subscriber<? super R> actual;
private Func1<? super T, ? extends R> transformer;
public MapSubscriber(Subscriber<? super R> o, Func1<? super T, ? extends R> transformer) {
this.actual = o;
this.transformer = transformer;
}
// 先不用關心具體實現,下面講到再看
@Override
public void onNext(T t) {
R r = transformer.call(t);
actual.onNext(r);
}
}
}
我們把 map() 和 lift() 都去掉,使用最基本的類來實現
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);
Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);
Observable<String> observable2 = new Observable<>(onSubscribe2);
Subscriber<String> subscriber = new Subscriber<>();
observable2.subscribe(subscriber);
到這里,清楚了如何把第一個 Observable<Integer> 轉成 Observable<String>,包括 OnSubscribe<Integer> onSubscribe 和 OnSubscribeLift<Integer, String> onSubscribe2 的關系
那么最終的 subscribe() 如何調用到第一個 observable.call(Subscriber<Integer>) 里面的 Subscriber<Integer>.onNext(Integer) 又如何調用到最終的訂閱者 subscriber<String>().onNext(String)
1) observable2.subscribe(subscriber) -->
2) onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
3) Subscriber<Integer> st = operatorMap.call(subscriber) 即
4) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
5) parent.call(st) 即 onSubscribe.call(st) -->
6) st.onNext(1) 即 MapSubscriber.onNext(1) -->
7) String string = func.call(1)
8) subscriber.onNext(string)
至此 Observable.create().map().subscribe() 的調用鏈就分析完了
很多操作符本質都是 lift(),以此類推,lift() 2次
3、subscribeOn
Scheduler 內部比較繁雜,我們簡化下,把 subscribeOn(Scheduler) 簡化成 subscribeOnIO()
Observable
.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
}
})
.subscribeOnIO()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
});
如何實現 subscribeOnIO() 讓第一個 observable 的 onSubscribe 運行在子線程
public final Observable<T> subscribeOnIO() {
return create(new OnSubscribeOnIO<T>(this));
}
public final class OnSubscribeOnIO<T> implements OnSubscribe<T> {
private static ExecutorService threadPool = Executors.newCachedThreadPool();
final Observable<T> source;
public OnSubscribeOnIO(Observable<T> source) {
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
Runnable runnable = new Runnable() {
@Override
public void run() {
source.subscribe(subscriber); // --> onSubscribe.call(subscriber) --> subscriber.onNext(1)
}
};
threadPool.submit(runnable);
}
}
從上面看出 subscribeOnIO() 新建了一個線程并加入 CachedThreadPool,在子線程里訂閱上一個 Observable,后續的調用都在這個線程里完成
再考慮下復雜點的,加入 map()
Observable
.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println(Thread.currentThread());
subscriber.onNext(1);
}
})
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
System.out.println(Thread.currentThread());
return "map" + integer;
}
})
.subscribeOnIO()
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread());
System.out.println(s);
}
});
非鏈式的寫法
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);
Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);
Observable<String> observable2 = new Observable<>(onSubscribe2);
OnSubscribeOnIO<String> onSubscribe3 = new OnSubscribeOnIO(observable2);
Observable<String> observable3 = new Observable<>(onSubscribe3);
Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);
1) observable3.subscribe(subscriber) -->
2) onSubscribe3.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) -->
3) 子線程 new Runnable(){} --> observable2.subscribe(subscriber)
4) onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
5) Subscriber<Integer> st = operatorMap.call(subscriber) 即
6) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
7) parent.call(st) 即 onSubscribe.call(st) -->
8) st.onNext(1) 即 MapSubscriber.onNext(1) -->
9) String string = func.call(1)
10) subscriber.onNext(string)
那要是把 map() 與 subscribeOnIO() 換下位置呢
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);
OnSubscribeOnIO<Integer> onSubscribe2 = new OnSubscribeOnIO(observable);
Observable<Integer> observable2 = new Observable<>(onSubscribe2);
Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscrib2, operatorMap);
Observable<String> observable3 = new Observable<>(onSubscribe3);
Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);
1) observable3.subscribe(subscriber) -->
2) onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
3) Subscriber<Integer> st = operatorMap.call(subscriber) 即
4) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
5) parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeOnIO.call(st) -->
6) 子線程 new Runnable(){} --> observable.subscribe(st) -->
7) onSubscribe.call(st) -->
7) st.onNext(1) 即 MapSubscriber.onNext(1) -->
8) String string = func.call(1)
9) subscriber.onNext(string)
看得出來,不管 subscribeOnIO() 在哪,第一個 onSubscribe.call() 總是運行在子線程
4、observeOn
先看下 demo 最終寫法
Handler handler = new Handler();
Observable
.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
}
})
.observeOn(handler)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "map" + integer;
}
})
.subscribeOnIO()
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}
});
handler.loop(); //隊列沒有消息時會掛起當前線程,直到收到新的消息
同樣我們也自己實現一個簡單的可以切換回主線程的 observeOn(Handler)
public class Observable<T> {
...
public final Observable<T> observeOn(Handler handler) {
return lift(new OperatorObserveOn<T>(handler));
}
}
OperatorObserveOn
public final class OperatorObserveOn<T> implements Operator<T, T> {
private Handler handler;
public OperatorObserveOn(Handler handler) {
this.handler = handler;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
Subscriber<T> s = new Subscriber<T>() {
@Override
public void onNext(final T t) {
handler.post(new Runnable() {
@Override
public void run() {
subscriber.onNext(t);
}
});
}
};
return s;
}
}
自定義Handler
public class Handler {
private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
public void loop() {
for (; ; ) {
Runnable runnable;
try {
runnable = queue.take();// 沒有數據則一直阻塞,直到有數據自動喚醒
} catch (InterruptedException e) {
return;
}
if (runnable == null) {
return;
}
runnable.run();
}
}
public void post(Runnable runnable) {
try {
queue.put(runnable);// 沒有空間則一直阻塞,直到有空間
} catch (InterruptedException e) {
return;
}
}
}
非鏈式寫法
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);
OperatorObserveOn<Integer> operatorObserveOn = new OperatorObserveOn(handler);
OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorObserveOn);
Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscribe2, operatorMap);
Observable<String> observable2 = new Observable<>(onSubscribe3);
OnSubscribeOnIO<String> onSubscribe4 = new OnSubscribeOnIO(observable2);
Observable<String> observable3 = new Observable<>(onSubscribe4);
Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);
1) observable3.subscribe(subscriber) -->
2) onSubscribe4.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) -->
3) 子線程 new Runnable(){} --> observable2.subscribe(subscriber)
4) onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
5) Subscriber<Integer> st = operatorMap.call(subscriber) 即
6) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
7) parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeLift.call(st)-->
8) Subscriber<Integer> st2 = operatorObserveOn.call(st) -->
9) parent.call(st2) 即 onSubscribe.call(st2) -->
8) st2.onNext(1) --> // onNext()里面切換到Handler所在線程
9) st.onNext(1) -->
9) String string = func.call(1)
10) subscriber.onNext(string)
5、其他
總的來說,調用鏈確實有點復雜,不過也還是可以接受的,一個調用鏈花點時間想想還是能清楚,只是每碰到一個調用鏈都要花點時間才能想清楚,還沒能力能在幾秒內就能想清楚,只能是多想多鍛煉了。比如想想上面的,如果把 observeOn(handler) 放在 map() 后面呢
來自:http://www.jianshu.com/p/6558ac156bae