RxJava基本流程和lift源碼分析
原文出處:http://blog.csdn.net/lzyzsd/article/details/50110355
首先感謝 扔物線 哥哥給的配圖,實在太贊了。
基本結構
我們先來看一段最基本的代碼,分析這段代碼在RxJava中是如何實現的。
Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); } }; Subscriber<String> subscriber1 = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }; Observable.create(onSubscriber1) .subscribe(subscriber1);
首先我們來看一下Observable.create的代碼
public final static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); } protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
直接就是調用了Observable的構造函數來創建一個新的Observable對象,這個對象我們暫時標記為observable1,以便后面追溯。
同時,會將我們傳入的OnSubscribe對象onSubscribe1保存在observable1的onSubscribe屬性中,這個屬性在后面的上下文中很重要,大家留心一下。
接下來我們來看看subscribe方法。
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... subscriber.onStart(); hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); }
可以看到,subscribe之后,就直接調用了observable1.onSubscribe.call方法,也就是我們代碼中的onSubscribe1對象的call方法
,傳入的參數就是我們代碼中定義的subscriber1對象。call方法中所做的事情就是調用傳入的subscriber1對象的onNext和onComplete方法。
這樣就實現了觀察者和被觀察者之間的通訊,是不是很簡單?
public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); }
lift
講之前先上一個簡單的lift流程圖吧
lift方法是RxJava中實現自定義operator的關鍵,這里我們以最簡單的map為例,來分析一下lift方法的工作原理,我們對上面的demo代碼稍作修改
Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); } }; Subscriber<Integer> subscriber1 = new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer i) { } }; Func1<String, Integer> transformer1 = new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.parseInt(s); } }; Observable.create(onSubscriber1) .map(transformer1) .subscribe(subscriber1);
和剛才不同的是我們在create之后調用了map方法,然后才調用subscribe方法。
map方法的代碼如下:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
一堆泛型參數是不是略暈啊,別急,我們慢慢來看。
首先來介紹一下Func這個接口。RxJava中有一系列Action+數字,Func+數字的接口,這些接口中都只有一個call方法,其中Action接口的call方法都沒有返回值,
Func接口的call方法都有返回值,后面的那個數字表示call方法接受幾個泛型類型的參數。
其實主要是因為Java中函數不是一等公民,所以只能用接口這么?嗦的格式,還好我們可以使用lambda簡化我們的代碼。(羨慕函數式語言)
這里map方法接收的參數類型為Func1<? super T, ? extends R> func,表示func的call方法接收一個T類型的參數,返回一個R類型的返回值。
OperatorMap又是什么鬼呢?
public final class OperatorMap<T, R> implements Operator<R, T> public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
這里可以看到OperatorMap繼承自Operator, 而Operator又繼承自Func1接口,也就是說Operator接口的call方法會接收一個Subscriber類型的參數,
并且返回另外一個Subscriber類型的對象。Operator.call方法返回一個Subscriber對象,其實我們可以這么理解,每一個operator也是一個訂閱者,
它返回的Subscriber對象正好用來訂閱Observable發出來的消息。
有一點需要注意的是OperatorMap和Operator的泛型參數順序剛好是相反的,為什么要這么做呢?其實很簡單,因為Operator本身是對Observable發出的數據
進行轉換的,所以經常會出現operator轉換之后返回的數據類型變了,而OperatorMap這里剛好顛倒了一下順序,就可以保證call方法返回的Subscriber類型
可以訂閱Observable發出的數據。
OperatorMap的代碼我們先不看,先來看一下lift方法中都做了些啥吧。
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { Subscriber<? super T> st = hook.onLift(operator).call(o); st.onStart(); onSubscribe.call(st); } }); }
lift方法會返回一個新創建的Observable對象,這里我們給這個Observable一個標識observable2。observable2的onSubscribe屬性就是lift中new出來的這個
OnSubscribe對象。
對照demo中的代碼,我們調用map之后,就調用了subscribe方法,也就是調用了這里的observable2的subscribe方法。
根據上面的介紹,調用subscribe之后,就會調用observable2.onSubscribe.call方法,call中首先做的事情就是調用OperatorMap的call方法
@Override public Subscriber<? super T> call(final Subscriber<? super R> o) { return new Subscriber<T>(o) { @Override public void onNext(T t) { o.onNext(transformer.call(t)); } }; }
OperatorMap的call方法返回了一個Subscriber對象,這里我們標記為subscriber$map,改對象有一個transformer屬性,就是我們在demo中定義的transformer1對象,
他是一個Func1類型,用來對每一個數據項進行變換。這里call方法中接收的Subscriber其實就是demo中的subscriber1對象。
我們回到lift中創建的observable2的call方法中繼續,拿到OperatorMap返回的Subscriber對象之后,接著調用了onSubscribe.call方法,并且將返回的Subscriber
對象傳遞進去。這里需要注意的一點就是onSubscribe對象就是我們在demo中定義的onSubscribe1變量,所以就是調用了onSubscribe1.call(subscriber$map)方法。
現在我們就可以從onSubscribe1.call的調用來分析一下數據的轉換過程,首先調用了subscriber
transformer1.call(“1”),然后使用返回值1,來調用onSubscribe1.onNext(1)方法,最終demo中的onSubscribe1就收到了1這個值。