RxJava源碼初探

michelleau 8年前發布 | 18K 次閱讀 RxJava 觀察者模式 JDK Java開發

來自: https://yq.aliyun.com/articles/4201?spm=5176.100239.yqblog1.11.XgWfkE

一、前言

RxJava是用java實現的ReactiveX(Reactive Extensions)框架開源庫。ReactiveX則是大名鼎鼎的響應式編程。而響應式編程和觀察者模式緊緊的相關聯。在看RxJava的源碼中,分析起來會有點麻煩,所以才有了這篇文章,和對這個有興趣的同學一起窺探一二。

二、觀察者模式

2.1 基本原理

觀察者模式是對象的行為模式,又叫發布-訂閱(Publish/Subscribe)模式即讓多個觀察者對象同時監聽某一個主題對象。這個主題對象在狀態上發生變化時,會通知所有觀察者對象,使它們能夠自動更新自己。在JDK中已有對觀察者模式的封裝:Observable.java/Observer.java(請自行對照JDK源碼看看,下面會提出2個問題):觀察者模式圖。

2.2 jdk實現版本

在JDK的實現版本中,Observable是被觀察者,有一個Vector數組來存放觀察者,當被觀察者事件改變時,調用notifyObservers來通知所有的觀察者,每個觀察者調用自己的update來做相應的更新操作。由于這個模式在JDK中實現的比較早(JDK1.0),有2個地方值得思考:

1、觀察者數組使用Vector,為什么不使用List?是否有替代品?

由于觀察者數組必須考慮到線程安全(比如在被觀察者發出通知的那一刻,同時添加新的觀察者,或者刪除某個觀察者等操作會讓被觀察者不知道到底通知哪些觀察者,即會引發線程安全),所以JDK版本中在對vector進行操作的時候,都會加上synchronized且在通知觀察者的時候反向遍歷數組,以此來保證線程安全。

(1)為什么不用List?

這里可以看到通知觀察者的時候需要反向遍歷數組,來保證如果是發出通知時有新的觀察者進來,新的觀察者不會收到當前通知。如果使用List反向遍歷會比使用數組更加復雜。

(2)是否有替代品?

這里的Vector操作滿足(a)讀大于寫,(b)線程安全,(c)寫時復制,所以可以用CopyOnWriteArrayList來代替,代替后實現代碼會更加的簡潔明了。

2、通知方法,通知每個觀察者的時候沒有使用try—catch,是否可以加上?

JDK observable通知方法notifyObservers代碼實現:

for (int i = arrLocal.length-1; i>=0; i--)
    ((Observer)arrLocal[i]).update(this, arg);

初看到這個段代碼時,會有疑惑這個循環中沒有對update加try-catch,當某個觀察者的更新操作拋異常會導致其他的觀察者更新失敗。那么如果我們加上try-catch會發生什么呢,雖然能夠保證每個觀察者都能做更新操作,但是一旦某個觀察者有異常,被被觀察者給捕獲了,而被觀察者捕獲后又不知道交給誰,怎么處理,會導致代碼編寫者以為所有的觀察者都正常執行了,所以在實現Observer的update操作時需要觀察者自己加上try-catch。

三、響應式編程

3.1 響應式編程的特點

上面有點跑了題。回歸正傳,有了觀察者模式的實現做鋪墊,對我們理解響應式編程原理會有很大的幫助。相同點,都是多個觀察者觀察一個被觀察者的狀態,觀察者對狀態變化做出自己的處理。不同點,從我目前看到源碼的程度上來看,我覺得觀察者模式和響應式編程的區別主要有以下幾個:

  • 1 觀察者模式是Observable-Observer,如果我們要在觀察者做操作前,對數據做一些其他的處理怎么辦?觀察者模式無法解耦,只能在observer的update操作中來做。而響應式編程是Obsevable-Operator1-OperatorN-Observer,可以很好的解耦控制數據流操作。
  • 2 響應式編程通過鏈式調用,讓用戶在代碼流程上能夠更加清晰的掌控(即使是異步操作)。
  • 3 觀察者模式無法處理觀察者的異常,需要用戶自己加try-catch結構。而響應式編程提供了另一種方案,用戶不需要使用try-catch只需實現錯誤處理方法就可以做到。
  • 4 觀察者模式若需要使用多線程,需要用戶在observer中實現多線程操作,也就是將觀察者和任務調度糅雜在一起。而響應式編程對觀察者和任務調度解耦,可以通過Schedulers,來處理線程調度問題。
  • 5 觀察者模式,觀察者無法處理自己的調用超時問題,響應式編程則可以設定觀察者的調度超時機制。
  • 6 響應式編程提供阻塞式(BlockingObservable)和非阻塞式Observable的調用

3.2 RxJava源碼分析

上文講了RxJava框架相比觀察者模式的各種優點,現在我們來分析源碼,由于RxJava的源碼比較龐大,本篇博客只分析RxJava的基本原理,即區別中的第一點和第二點。對于其他的點會在另起blog來分析。

3.2.1 調用展現

好了,分析RxJava的基本原理前,我們先上一段測試代碼(此處的Observable是RxJava中的,不是JDK中的):

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
}).map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + " word";
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println("error");
    }
    @Override
    public void onNext(String s) {
        System.out.println("rx " + s);
    }
});

輸出:rx hello world

這種連.的寫法對看代碼來說有點累,簡化下:

</div>

(1)Observable<String> observable1 = Observable.create(new OnSubscribe());
(2)Observable<String> observable2 = observable.map(new Func1());
(3)Subscription sub = observable2.subscribe(new Subscriber());

代碼一簡化,我們就可以對上文說的3.1節中響應式編程的特點1:數據流操作和觀察者處理解耦的原理有所了解了。

這里的解耦方式,其實就是在原被觀察者進行數據流操作(map)后生成一個新的被觀察者,觀察者其實訂閱的是數據流操作生成的被觀察者。

在整個過程中,可以看到三個重要的類:

</div>

  • Observable:被觀察者
  • OnSubscriber:Observable的成員變量,用來調用觀察者處理方法
  • Subscriber:觀察者

3.2.2 代碼分析

RxJava具體怎么處理,分3步看:

3.2.2.1 生成被觀察者Observable對象

第一行代碼,(1)Observable observable1 = Observable.create(new OnSubscribe());創建了一個被觀察者,進入源碼

public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

/**

  • Invoked when Observable.subscribe is called. */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity }</code></pre>

    即create為Observable的對象observable1創建了一個默認的鉤子hook和一個Onsubscribe對象。

    3.2.2.2 數據流處理

    第二行代碼,(2)Observable observable2 = observable.map(new Func1());創建了一個新的被觀察者2,進入源碼:

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
     return lift(new OperatorMap<T, R>(func));
    }

    這一行代碼很關鍵,也很繞,2個動作

    (1)生成了一個OperatorMap對象。這個對象call方法,參數為觀察者Subscriber對象;返回值是一個新的觀察者Subscriber對象。新的Subscriber對象的onNext方法先調用operator操作(map的call方法)的call方法,拿Operator操作返回的數據,再調用外部觀察者的onNext方法。很熟悉是嗎?上文說到的ReactiveX編程可以實現Observable-Operator1-----OperatorN-Observer,在這段代碼中得到了體現:在調用Observer的OnNext方法前,會先調用OperatorN的call方法對數據處理,處理完成后的數據值,作為參數傳入Observer的OnNext方法。

    我們需要注意的是,OperatorMap的call方法返回的是一個新的觀察者,新的觀察者通過OnNext方法將老的觀察者給鏈接起來的。那么新的觀察者什么時候訂閱被觀察者的呢?這個就是接下來的第(2)個動作:lift

    public final class OperatorMap<T, R> implements Operator<R, T> {
     private final Func1<? super T, ? extends R> transformer;
     public OperatorMap(Func1<? super T, ? extends R> transformer) {

     this.transformer = transformer;
    

    } @Override public Subscriber<? super T> call(final Subscriber<? super R> o) {

     return new Subscriber<T>(o) {
         @Override
         public void onCompleted() {
             o.onCompleted();
         }
         @Override
         public void onError(Throwable e) {
             o.onError(e);
         }
         @Override
         public void onNext(T t) {
             try {
                 o.onNext(transformer.call(t));
             } catch (Throwable e) {
                 Exceptions.throwOrReport(e, this, t);
             }
         }
     };
    

    }

}</code></pre>

(2)調用了observable1的lift方法,這個方法用來干什么呢?

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            st.onStart();
            onSubscribe.call(st); 
        });
}

源碼略大我簡化了一下,其實就是原Observable(observable1)的lift方法創建了一個新的Observable對象(observable2),這個新的對象的OnSubscribe成員變量的call方法將map-OpreatorMap生成的觀察者傳入到了最開始的Observable的OnSubscirbe中進行處理。

過程再整理一下:

  • 1、 Create操作:創建一個Observable對象observable1,對象中的OnSubscribe用來響應Subscriber
  • 2、 OperatorMap操作生成新的觀察者,新的觀察者通過OnNext方法,先調用map的call方法處理數據,再調用1中的觀察者Subscriber的OnNext方法。
  • 3、lift 操作生成新的被觀察者observable2,observable2中的call方法通過observable1的成員變量OnSubscribe來調用2中生成的新的觀察者的call方法。

接下來就是最后一步了,什么時候調用observable2的call方法呢?這個就是下面的介紹,被觀察者訂閱觀察者,在訂閱的時候進行調用。

3.2.2.3 觀察者訂閱

第三行代碼,Subscription sub = observable2.subscribe(new Subscriber());訂閱subscribe源碼核心代碼:

hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber);

其實這個就是調用observable2的OnSubscribe的call方法,參數是傳入的訂閱者。

說白了觀察者訂閱其實就是用被觀察者的subscribe方法訂閱觀察者,且調用被觀察者的OnSubscribe方法來調用觀察者的處理方法onNext。

這里可能就會有疑問:為什么是被觀察者來訂閱觀察者,而不是觀察者來訂閱被觀察者?

這種做法可以使整個過程使用連點(.)來完成從數據處理到訂閱者響應的所有流程,代碼上會更加的清晰。

</div>

好了,是時候祭出大招了,映照下面這個圖,你會更加的清晰。

如果,你覺得還不過癮,還有的沒有了解清楚。建議看代碼。。。,下一篇博客會介紹RxJava使用Scheduler多線程源碼分析

</code></code></code></code></code></code></code></code></div>

 本文由用戶 michelleau 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!