RxJava基本流程和lift源碼分析

jopen 8年前發布 | 10K 次閱讀 Android開發 移動開發

原文出處:http://blog.csdn.net/lzyzsd/article/details/50110355 

首先感謝 扔物線 哥哥給的配圖,實在太贊了。

基本結構

我們先來看一段最基本的代碼,分析這段代碼在RxJava中是如何實現的。

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("1");
        subscriber.onCompleted();
    }
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {

    }
};

Observable.create(onSubscriber1)
        .subscribe(subscriber1);

首先我們來看一下Observable.create的代碼

public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

直接就是調用了Observable的構造函數來創建一個新的Observable對象,這個對象我們暫時標記為observable1,以便后面追溯。 
同時,會將我們傳入的OnSubscribe對象onSubscribe1保存在observable1的onSubscribe屬性中,這個屬性在后面的上下文中很重要,大家留心一下。

接下來我們來看看subscribe方法。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    ...
    subscriber.onStart();
    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
    return hook.onSubscribeReturn(subscriber);
}

可以看到,subscribe之后,就直接調用了observable1.onSubscribe.call方法,也就是我們代碼中的onSubscribe1對象的call方法 
,傳入的參數就是我們代碼中定義的subscriber1對象。call方法中所做的事情就是調用傳入的subscriber1對象的onNext和onComplete方法。 
這樣就實現了觀察者和被觀察者之間的通訊,是不是很簡單?

public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("1");
    subscriber.onCompleted();
}

20151130114729404.gif

lift

講之前先上一個簡單的lift流程圖吧 

20151130114942969.gif

lift方法是RxJava中實現自定義operator的關鍵,這里我們以最簡單的map為例,來分析一下lift方法的工作原理,我們對上面的demo代碼稍作修改

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("1");
        subscriber.onCompleted();
    }
};
Subscriber<Integer> subscriber1 = new Subscriber<Integer>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Integer i) {

    }
};
Func1<String, Integer> transformer1 = new Func1<String, Integer>() {
    @Override
    public Integer call(String s) {
        return Integer.parseInt(s);
    }
};

Observable.create(onSubscriber1)
        .map(transformer1)
        .subscribe(subscriber1);

和剛才不同的是我們在create之后調用了map方法,然后才調用subscribe方法。 
map方法的代碼如下:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

一堆泛型參數是不是略暈啊,別急,我們慢慢來看。

首先來介紹一下Func這個接口。RxJava中有一系列Action+數字,Func+數字的接口,這些接口中都只有一個call方法,其中Action接口的call方法都沒有返回值, 
Func接口的call方法都有返回值,后面的那個數字表示call方法接受幾個泛型類型的參數。

其實主要是因為Java中函數不是一等公民,所以只能用接口這么?嗦的格式,還好我們可以使用lambda簡化我們的代碼。(羨慕函數式語言)

這里map方法接收的參數類型為Func1<? super T, ? extends R> func,表示func的call方法接收一個T類型的參數,返回一個R類型的返回值。

OperatorMap又是什么鬼呢?

public final class OperatorMap<T, R> implements Operator<R, T>

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>

這里可以看到OperatorMap繼承自Operator, 而Operator又繼承自Func1接口,也就是說Operator接口的call方法會接收一個Subscriber類型的參數, 
并且返回另外一個Subscriber類型的對象。Operator.call方法返回一個Subscriber對象,其實我們可以這么理解,每一個operator也是一個訂閱者, 
它返回的Subscriber對象正好用來訂閱Observable發出來的消息。

有一點需要注意的是OperatorMap和Operator的泛型參數順序剛好是相反的,為什么要這么做呢?其實很簡單,因為Operator本身是對Observable發出的數據 
進行轉換的,所以經常會出現operator轉換之后返回的數據類型變了,而OperatorMap這里剛好顛倒了一下順序,就可以保證call方法返回的Subscriber類型 
可以訂閱Observable發出的數據。

OperatorMap的代碼我們先不看,先來看一下lift方法中都做了些啥吧。

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            st.onStart();
            onSubscribe.call(st);
        }
    });
}

lift方法會返回一個新創建的Observable對象,這里我們給這個Observable一個標識observable2。observable2的onSubscribe屬性就是lift中new出來的這個 
OnSubscribe對象。

對照demo中的代碼,我們調用map之后,就調用了subscribe方法,也就是調用了這里的observable2的subscribe方法。 
根據上面的介紹,調用subscribe之后,就會調用observable2.onSubscribe.call方法,call中首先做的事情就是調用OperatorMap的call方法

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
    return new Subscriber<T>(o) {
        @Override
        public void onNext(T t) {
            o.onNext(transformer.call(t));
        }
    };
}

OperatorMap的call方法返回了一個Subscriber對象,這里我們標記為subscriber$map,改對象有一個transformer屬性,就是我們在demo中定義的transformer1對象, 
他是一個Func1類型,用來對每一個數據項進行變換。這里call方法中接收的Subscriber其實就是demo中的subscriber1對象。

我們回到lift中創建的observable2的call方法中繼續,拿到OperatorMap返回的Subscriber對象之后,接著調用了onSubscribe.call方法,并且將返回的Subscriber 
對象傳遞進去。這里需要注意的一點就是onSubscribe對象就是我們在demo中定義的onSubscribe1變量,所以就是調用了onSubscribe1.call(subscriber$map)方法。

現在我們就可以從onSubscribe1.call的調用來分析一下數據的轉換過程,首先調用了subscriber map.onNext("1"),subscriber map.onNext中會首先調用 
transformer1.call(“1”),然后使用返回值1,來調用onSubscribe1.onNext(1)方法,最終demo中的onSubscribe1就收到了1這個值。

20151130115010316.gif

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!