謎之RxJava (一) ―― 最基本的觀察者模式

jopen 8年前發布 | 11K 次閱讀 Android開發 移動開發

最近在Android界,最火的framework大概就是RxJava了。
扔物線大大之前寫了一篇文章 《給 Android 開發者的 RxJava 詳解》,在我學習RxJava的過程中受益匪淺。經過閱讀這篇文章后,我們來看下RxJava的源碼,揭開它神秘的面紗。

這里準備分幾篇文章寫,為了能讓自己有個喘口氣的機會。

先來上個最最簡單的,經典的Demo。

Demo

Observable.create(new Observable.OnSubscribe<String>() {    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
}).subscribe(new Subscriber<String>() {    @Override
    public void onCompleted() {

    }    @Override
    public void onError(Throwable e) {

    }    @Override
    public void onNext(String s) {
        Log.d("rx", s);
    }
});

這段代碼產生的最終結果就是在Log里會出現hello。

看下這段代碼的具體流程吧。
這里有2個函數create和subscribe,我們看看create里面看了啥。

OnSubscribe對象

public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}
// constructor
protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

這里的hook是一個默認實現,里面不做任何事,就是返回f。我們看見create只是給Observable的onSubscribe賦值了我們定義的OnSubscribe。

Subscriber對象

來看下subscribe這個函數做了什么事

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
 // validate and proceed
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
        /*
         * the subscribe function can also be overridden but generally that's not the appropriate approach
         * so I won't mention that in the exception
         */
    }
    
    // new Subscriber so onStart it
    subscriber.onStart();
    
    /*
     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
     * to user code from within an Observer"
     */
    // if not already wrapped
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // if an unhandled error occurs executing the onSubscribe we will propagate it
        try {
            subscriber.onError(hook.onSubscribeError(e));
        } catch (OnErrorNotImplementedException e2) {
            // special handling when onError is not implemented ... we just rethrow
            throw e2;
        } catch (Throwable e2) {
            // if this happens it means the onError itself failed (perhaps an invalid function implementation)
            // so we are unable to propagate the error correctly and will just throw
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            // TODO could the hook be the cause of the error in the on error handling.
            hook.onSubscribeError(r);
            // TODO why aren't we throwing the hook's return value.
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}

我們看到,這里我們的subscriber被SafeSubscriber包裹了一層。

if (!(subscriber instanceof SafeSubscriber)) {
    // assign to `observer` so we return the protected version
    subscriber = new SafeSubscriber<T>(subscriber);
}

然后開始執行工作流

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);

默認的hook只是返回我們之前定義的onSubscribe,這里調用的call方法就是我們在外面定義的

new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
})

我們調用傳入的subscriber對象的onNext方法,這里的subscriber是SafeSubscriber
在SafeScriber中

public void onNext(T args) {
    try {
        if (!done) {
            actual.onNext(args);
        }
    } catch (Throwable e) {
        // we handle here instead of another method so we don't add stacks to the frame
        // which can prevent it from being able to handle StackOverflow
        Exceptions.throwIfFatal(e);
        // handle errors if the onNext implementation fails, not just if the Observable fails
        onError(e);
    }
}

actual就是我們自己定義的subscriber。 原來SafeSubscriber只是為了幫我們處理好異常,以及防止工作流的重復。

這是RxJava最最基本的工作流,讓我們認識到他是怎么工作的。之后我們來講講其中的細節和其他神奇的內容。

歡迎關注我Github 以及 weibo@Gemini

原文出處: http://segmentfault.com/a/1190000004049490 

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!