RxJava 教程第三部分:馴服數據流之 高級錯誤處理
前面已經知道如何使用 Observer 來處理錯誤情況。在前面一節中我們通過避免 Monad 使用傳統的 Java 方式來處理異常。代碼中可以出現各種各樣的異常情況,并不是每一個異常都需要告訴上層代碼的。在傳統的 Java 中,你可以捕獲一個異常,然后決定是自己處理該異常還是再次拋出去。同樣,在 RxJava 中,你也可以根據異常來執行不同的邏輯而無需結束 Observable,也不再強迫 Observer 處理所有情況。
Resume
onErrorReturn
onErrorReturn 操作函數的功能是:當發生錯誤的時候,發射一個默認值然后結束數據流。所以 Subscriber 看不到異常信息,看到的是正常的數據流結束狀態。
Observable<String> values = Observable.create(o -> {
o.onNext("Rx");
o.onNext("is");
o.onError(new Exception("adjective unknown"));
});
values
.onErrorReturn(e -> "Error: " + e.getMessage())
.subscribe(v -> System.out.println(v));
結果:
Rx
is
Error: adjectiveunknown
onErrorResumeNext
onErrorResumeNext 的功能是:當錯誤發生的時候,使用另外一個數據流繼續發射數據。在返回的 Observable 中是看不到錯誤信息的。
public final Observable<T> onErrorResumeNext(
Observable<? extends T> resumeSequence)
public final Observable<T> onErrorResumeNext(
Func1<java.lang.Throwable,? extends Observable<? extends T>> resumeFunction)
第二個重載的函數可以根據錯誤的信息來返回不同的 Observable。
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onError(new Exception("Oops"));
});
values
.onErrorResumeNext(Observable.just(Integer.MAX_VALUE))
.subscribe(new PrintSubscriber("with onError: "));
結果:
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onError(new Exception("Oops"));
});
values
.onErrorResumeNext(Observable.just(Integer.MAX_VALUE))
.subscribe(new PrintSubscriber("with onError: "));
利用這個操作函數可以實現把一個異常信息包裝起來再次拋出。在傳統的 Java 中,如果異常發生的時候發現當前無法處理該異常,則會再次拋出該異常。通常情況下都會包裝(Wrap)一下異常信息再拋出。在 Rx 中也可以這樣用:
.onErrorResumeNext(e -> Observable.error(new UnsupportedOperationException(e)))
onExceptionResumeNext
onExceptionResumeNext 和 onErrorResumeNext 的區別是只捕獲 Exception;
Observable<String> values = Observable.create(o -> {
o.onNext("Rx");
o.onNext("is");
//o.onError(new Throwable() {}); // 這個為 error 不會捕獲
o.onError(new Exception()); // 這個為 Exception 會被捕獲
});
values
.onExceptionResumeNext(Observable.just("hard"))
.subscribe(v -> System.out.println(v));
Retry
如果發生了不定性的異常,則通常會重試一下看看是否正常了。 retry 的功能就算重新訂閱到事件流,并重頭重新開始發射數據。
public final Observable<T> retry()
public final Observable<T> retry(long count)
沒有參數的 retry() 函數會一直重試,直到沒有異常發生為止。而帶有參數的 retry(n) 函數會重試 N 次, 如果 N 次后還是失敗,則不再重試了,數據流發射一個異常信息并結束。
Randomrandom = new Random();
Observable<Integer> values = Observable.create(o -> {
o.onNext(random.nextInt() % 20);
o.onNext(random.nextInt() % 20);
o.onError(new Exception());
});
values
.retry(1)
.subscribe(v -> System.out.println(v));
結果:
0
13
9
15
java.lang.Exception
上面的示例,發射了兩個數字遇到異常信息,然后重試一次,又發射 兩個數據遇到異常信息,然后拋出該異常并結束。
請注意:上面的示例中兩次發射的數字不一樣。說明 retry 并不像 replay 一樣會緩存之前的數據。一般情況下,這樣的情況都是不合理的。所以一般情況下,只有具有副作用的時候或者 Observable 是 hot 的時候 才應該使用 retry。
retryWhen
retryWhen 更具有控制力。
public final Observable<T> retryWhen(
Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
retryWhen 的參數是一個函數, 該函數的輸入參數為一個異常 Observable,返回值為另外一個 Observable。 輸入參數中包含了 retryWhen 發生時候遇到的異常信息;返回的 Observable 為一個信號,用來判別何時需要重試的:
– 如果返回的 Observable 發射了一個數據,retryWhen 將會執行重試操作
– 如果返回的 Observable 發射了一個錯誤信息,retryWhen 將會發射一個錯誤并不會重試
– 如果返回的 Observable 正常結束了,retryWhen 也正常結束。
參數返回的 Observable 發射的數據類型是無關緊要的。該 Observable 的數據只是用來當做是否重試的信號。數據本身是無用的。
下面一個示例,構造一個等待 100 毫秒再重試的機制:
Observable<Integer> source = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onError(new Exception("Failed"));
});
source.retryWhen((o) -> o
.take(2)
.delay(100, TimeUnit.MILLISECONDS))
.timeInterval()
.subscribe(
System.out::println,
System.out::println);
結果:
TimeInterval [intervalInMilliseconds=21, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
TimeInterval [intervalInMilliseconds=104, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
TimeInterval [intervalInMilliseconds=103, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
源 Observable 發射兩個數字 然后遇到異常;當異常發生的時候,retryWhen 返回的 判斷條件 Observable 會獲取到這個異常,這里等待 100毫秒然后把這個異常當做數據發射出去告訴 retryWhen 開始重試。take(2) 參數確保判斷條件 Observable 只發射兩個數據(源 Observable 出錯兩次)然后結束。所以當源 Observable 出現兩次錯誤以后就不再重試了。
using
using 操作函數是用來管理資源的,如果一個 Observable 需要使用一個資源來發射數據(比如 需要使用一個文件資源,從文件中讀取內容),當該 Observable 結束的時候(不管是正常結束還是異常結束)就釋放該資源。這樣你就不用自己管理資源了, 用 Rx 的方式來管理資源。
public static final <T,Resource> Observable<T> using(
Func0<Resource> resourceFactory,
Func1<? super Resource,? extends Observable<? extends T>> observableFactory,
Action1<? super Resource> disposeAction)
using 有三個參數。當 Observable 被訂閱的時候,resourceFactory 用來獲取到需要的資源;observableFactory 用這個資源來發射數據;當 Observable 完成的時候,disposeAction 來釋放資源。
下面的示例中,假設 String 是一個需要管理的資源。
Observable<Character> values = Observable.using(
() -> {
String resource = "MyResource";
System.out.println("Leased: " + resource);
return resource;
},
(resource) -> {
return Observable.create(o -> {
for (Character c : resource.toCharArray())
o.onNext(c);
o.onCompleted();
});
},
(resource) -> System.out.println("Disposed: " + resource));
values
.subscribe(
v -> System.out.println(v),
e -> System.out.println(e));
結果:
Leased: MyResource
M
y
R
e
s
o
u
r
c
e
Disposed: MyResource
當訂閱到 values 的時候, 調用 resourceFactory 函數返回一個字符串 “MyResource”;observableFactory 使用返回的 “MyResource” 字符串來生成一個 Observable, 該 Observable 發射”MyResource” 字符串中的每個字符;當發生完成的時候, disposeAction 來釋放這個字符串資源。
有一點需要注意: 和使用 create 創建 Observable 一樣,我們需要自己來結束 Observable 的發射(onCompleted 的調用)。如果你沒有結束 Observable,則資源是永遠不會釋放的。