RxJava 教程第三部分:馴服數據流之 高級錯誤處理

tgow0735 8年前發布 | 7K 次閱讀 RxJava Java開發

前面已經知道如何使用 Observer 來處理錯誤情況。在前面一節中我們通過避免 Monad 使用傳統的 Java 方式來處理異常。代碼中可以出現各種各樣的異常情況,并不是每一個異常都需要告訴上層代碼的。在傳統的 Java 中,你可以捕獲一個異常,然后決定是自己處理該異常還是再次拋出去。同樣,在 RxJava 中,你也可以根據異常來執行不同的邏輯而無需結束 Observable,也不再強迫 Observer 處理所有情況。

Resume

onErrorReturn

onErrorReturn 操作函數的功能是:當發生錯誤的時候,發射一個默認值然后結束數據流。所以 Subscriber 看不到異常信息,看到的是正常的數據流結束狀態。

Observable<String> values = Observable.create(o -> {
    o.onNext("Rx");
    o.onNext("is");
    o.onError(new Exception("adjective unknown"));
});
 
values
    .onErrorReturn(e -> "Error: " + e.getMessage())
    .subscribe(v -> System.out.println(v));
 

結果:

Rx
is
Error: adjectiveunknown
 

onErrorResumeNext

onErrorResumeNext 的功能是:當錯誤發生的時候,使用另外一個數據流繼續發射數據。在返回的 Observable 中是看不到錯誤信息的。

public final Observable<T> onErrorResumeNext(
    Observable<? extends T> resumeSequence)
public final Observable<T> onErrorResumeNext(
    Func1<java.lang.Throwable,? extends Observable<? extends T>> resumeFunction)
 

第二個重載的函數可以根據錯誤的信息來返回不同的 Observable。

Observable<Integer> values = Observable.create(o -> {
    o.onNext(1);
    o.onNext(2);
    o.onError(new Exception("Oops"));
});
 
values
    .onErrorResumeNext(Observable.just(Integer.MAX_VALUE))
    .subscribe(new PrintSubscriber("with onError: "));
 

結果:

Observable<Integer> values = Observable.create(o -> {
    o.onNext(1);
    o.onNext(2);
    o.onError(new Exception("Oops"));
});
 
values
    .onErrorResumeNext(Observable.just(Integer.MAX_VALUE))
    .subscribe(new PrintSubscriber("with onError: "));
 

利用這個操作函數可以實現把一個異常信息包裝起來再次拋出。在傳統的 Java 中,如果異常發生的時候發現當前無法處理該異常,則會再次拋出該異常。通常情況下都會包裝(Wrap)一下異常信息再拋出。在 Rx 中也可以這樣用:

.onErrorResumeNext(e -> Observable.error(new UnsupportedOperationException(e)))
 

onExceptionResumeNext

onExceptionResumeNext 和 onErrorResumeNext 的區別是只捕獲 Exception;

Observable<String> values = Observable.create(o -> {
    o.onNext("Rx");
    o.onNext("is");
    //o.onError(new Throwable() {}); // 這個為 error 不會捕獲
    o.onError(new Exception()); // 這個為 Exception 會被捕獲
});
 
values
    .onExceptionResumeNext(Observable.just("hard"))
    .subscribe(v -> System.out.println(v));
 

Retry

如果發生了不定性的異常,則通常會重試一下看看是否正常了。 retry 的功能就算重新訂閱到事件流,并重頭重新開始發射數據。

public final Observable<T> retry()
public final Observable<T> retry(long count)
 

沒有參數的 retry() 函數會一直重試,直到沒有異常發生為止。而帶有參數的 retry(n) 函數會重試 N 次, 如果 N 次后還是失敗,則不再重試了,數據流發射一個異常信息并結束。

Randomrandom = new Random();
Observable<Integer> values = Observable.create(o -> {
    o.onNext(random.nextInt() % 20);
    o.onNext(random.nextInt() % 20);
    o.onError(new Exception());
});
 
values
    .retry(1)
    .subscribe(v -> System.out.println(v));
 

結果:

0
13
9
15
java.lang.Exception
 

上面的示例,發射了兩個數字遇到異常信息,然后重試一次,又發射 兩個數據遇到異常信息,然后拋出該異常并結束。

請注意:上面的示例中兩次發射的數字不一樣。說明 retry 并不像 replay 一樣會緩存之前的數據。一般情況下,這樣的情況都是不合理的。所以一般情況下,只有具有副作用的時候或者 Observable 是 hot 的時候 才應該使用 retry。

retryWhen

retryWhen 更具有控制力。

public final Observable<T> retryWhen(
    Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
 

retryWhen 的參數是一個函數, 該函數的輸入參數為一個異常 Observable,返回值為另外一個 Observable。 輸入參數中包含了 retryWhen 發生時候遇到的異常信息;返回的 Observable 為一個信號,用來判別何時需要重試的:

– 如果返回的 Observable 發射了一個數據,retryWhen 將會執行重試操作

– 如果返回的 Observable 發射了一個錯誤信息,retryWhen 將會發射一個錯誤并不會重試

– 如果返回的 Observable 正常結束了,retryWhen 也正常結束。

參數返回的 Observable 發射的數據類型是無關緊要的。該 Observable 的數據只是用來當做是否重試的信號。數據本身是無用的。

下面一個示例,構造一個等待 100 毫秒再重試的機制:

Observable<Integer> source = Observable.create(o -> {
    o.onNext(1);
    o.onNext(2);
    o.onError(new Exception("Failed"));
});
 
source.retryWhen((o) -> o
        .take(2)
        .delay(100, TimeUnit.MILLISECONDS))
    .timeInterval()
    .subscribe(
        System.out::println,
        System.out::println);
 

結果:

TimeInterval [intervalInMilliseconds=21, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
TimeInterval [intervalInMilliseconds=104, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
TimeInterval [intervalInMilliseconds=103, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
 

源 Observable 發射兩個數字 然后遇到異常;當異常發生的時候,retryWhen 返回的 判斷條件 Observable 會獲取到這個異常,這里等待 100毫秒然后把這個異常當做數據發射出去告訴 retryWhen 開始重試。take(2) 參數確保判斷條件 Observable 只發射兩個數據(源 Observable 出錯兩次)然后結束。所以當源 Observable 出現兩次錯誤以后就不再重試了。

using

using 操作函數是用來管理資源的,如果一個 Observable 需要使用一個資源來發射數據(比如 需要使用一個文件資源,從文件中讀取內容),當該 Observable 結束的時候(不管是正常結束還是異常結束)就釋放該資源。這樣你就不用自己管理資源了, 用 Rx 的方式來管理資源。

public static final <T,Resource> Observable<T> using(
    Func0<Resource> resourceFactory,
    Func1<? super Resource,? extends Observable<? extends T>> observableFactory,
    Action1<? super Resource> disposeAction)
 

using 有三個參數。當 Observable 被訂閱的時候,resourceFactory 用來獲取到需要的資源;observableFactory 用這個資源來發射數據;當 Observable 完成的時候,disposeAction 來釋放資源。

下面的示例中,假設 String 是一個需要管理的資源。

Observable<Character> values = Observable.using(
    () -> {
        String resource = "MyResource";
        System.out.println("Leased: " + resource);
        return resource;
    },
    (resource) -> {
        return Observable.create(o -> {
            for (Character c : resource.toCharArray())
                o.onNext(c);
            o.onCompleted();
        });
    },
    (resource) -> System.out.println("Disposed: " + resource));
 
values
    .subscribe(
        v -> System.out.println(v),
        e -> System.out.println(e));
 

結果:

Leased: MyResource
M
y
R
e
s
o
u
r
c
e
Disposed: MyResource
 

當訂閱到 values 的時候, 調用 resourceFactory 函數返回一個字符串 “MyResource”;observableFactory 使用返回的 “MyResource” 字符串來生成一個 Observable, 該 Observable 發射”MyResource” 字符串中的每個字符;當發生完成的時候, disposeAction 來釋放這個字符串資源。

有一點需要注意: 和使用 create 創建 Observable 一樣,我們需要自己來結束 Observable 的發射(onCompleted 的調用)。如果你沒有結束 Observable,則資源是永遠不會釋放的。

來自: http://blog.chengyunfeng.com/?p=970

 本文由用戶 tgow0735 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!