RxJava 教程第四部分:并發 之數據流發射太快如何辦
Backpressure
Rx 中的數據流是從一個地方發射到另外一個地方。每個地方處理數據的速度是不一樣的。如果生產者發射數據的速度比消費者處理的快會出現什么情況?在同步操作中,這不是個問題,例如:
// Produce Observable<Integer> producer = Observable.create(o -> { o.onNext(1); o.onNext(2); o.onCompleted(); }); // Consume producer.subscribe(i -> { try { Thread.sleep(1000); System.out.println(i); } catch (Exception e) { } });
雖然上面的消費者處理數據的速度慢,但是由于是同步調用的,所以當 o.onNext(1) 執行后,一直阻塞到消費者處理完才執行 o.onNext(2)。
但是生產者和消費者異步處理的情況很常見。如果是在異步的情況下會出現什么情況呢?
在傳統的 pull 模型中,當消費者請求數據的時候,如果生產者比較慢,則消費者會阻塞等待。如果生產者比較快,則生產者會等待消費者處理完后再生產新的數據。
而 Rx 為 push 模型。 在 Rx 中,只要生產者數據好了就發射出去了。如果生產者比較慢,則消費者就會等待新的數據到來。如果生產者快,則就會有很多數據發射給消費者,而不管消費者當前有沒有能力處理數據。這樣會導致一個問題,例如:
Observable.interval(1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.newThread()) .subscribe( i -> { System.out.println(i); try { Thread.sleep(100); } catch (Exception e) { } }, System.out::println);
結果:
0 1 rx.exceptions.MissingBackpressureException
上面的 MissingBackpressureException 告訴我們,生產者太快了,我們的操作函數無法處理這種情況。
消費者的補救措施
有些操作函數可以減少發送給消費者的數據。
過濾數據
sample 操作函數可以指定生產者發射數據的最大速度,多余的數據被丟棄了。
Observable.interval(1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.newThread()) .sample(100, TimeUnit.MILLISECONDS) .subscribe( i -> { System.out.println(i); try { Thread.sleep(100); } catch (Exception e) { } }, System.out::println);
結果:
82 182 283 ...
throttle 和Debounce 也能實現類似的效果。
Collect
如果你不想丟棄數據,則當消費者忙的時候可以使用 buffer 和 window 操作函數來收集數據。如果批量處理數據速度比較快,則可以使用這種方式。
Observable.interval(10, TimeUnit.MILLISECONDS) .observeOn(Schedulers.newThread()) .buffer(100, TimeUnit.MILLISECONDS) .subscribe( i -> { System.out.println(i); try { Thread.sleep(100); } catch (Exception e) { } }, System.out::println);
結果:
[0, 1, 2, 3, 4, 5, 6, 7] [8, 9, 10, 11, 12, 13, 14, 15, 16, 17] [18, 19, 20, 21, 22, 23, 24, 25, 26, 27] ...
Reactive pull
上面的方式有時候可以解決問題,但是并不是 Rx 中最好的處理方式。有時候在 生產者這里處理可能是最好的情況。Backpressure 是一種用來在生產者端降低發射速度的方式。
RxJava 實現了一種通過 Subscriber 來通知 Observable 發射數據的方式。Subscriber 有個函數 request(n),調用該函數用來通知 Observable 現在 Subscriber 準備接受下面 n 個數據了。在 Subscriber 的 onStart 函數里面調用 request 函數則就開啟了reactive pull backpressure。這并不是傳統的 pull 模型,并不會阻塞調用。只是 Subscriber 通知 Observable 當前 Subscriber 的處理能力。 通過調用 request 可以發射更多的數據。
class MySubscriber extends Subscriber<T> { @Override public void onStart() { request(1); } @Override public void onCompleted() { ... } @Override public void onError(Throwable e) { ... } @Override public void onNext(T n) { ... request(1); } }
在 onStart 函數中調用 request(1) 開啟了 backpressure 模式,告訴 Observable 一次只發射一個數據。在 onNext 里面處理完該數據后,可以請求下一個數據。通過 quest(Long.MAX_VALUE) 可以取消 backpressure 模式。
doOnRequested
在副作用一節討論 doOn_ 函數的時候,我們沒有討論 doOnRequested 這個函數:
public final Observable<T> doOnRequest(Action1<java.lang.Long> onRequest)
當 Subscriber 請求更多的時候的時候, doOnRequest 就會被調用。參數中的值為請求的數量。
當前 doOnRequest 還是一個 beta 測試版本的 api。 所以在開發過程中盡量避免使用。下面來演示一下這個 api:
Observable.range(0, 3) .doOnRequest(i -> System.out.println("Requested " + i)) .subscribe(System.out::println);
結果:
Requested 9223372036854775807 0 1 2
可以看到 subscriber 在開始的時候,請求了最大數量的數據。這意味著沒有使用 backpressure 模型。只有當一個 Subscriber 實現了 backpressure 的時候,Subscribe 才能使用該功能。下面是一個在外部實現 控制backpressure 的示例:
public class ControlledPullSubscriber<T> extends Subscriber<T> { private final Action1<T> onNextAction; private final Action1<Throwable> onErrorAction; private final Action0onCompletedAction; public ControlledPullSubscriber( Action1<T> onNextAction, Action1<Throwable> onErrorAction, Action0onCompletedAction) { this.onNextAction = onNextAction; this.onErrorAction = onErrorAction; this.onCompletedAction = onCompletedAction; } public ControlledPullSubscriber( Action1<T> onNextAction, Action1<Throwable> onErrorAction) { this(onNextAction, onErrorAction, () -> {}); } public ControlledPullSubscriber(Action1<T> onNextAction) { this(onNextAction, e -> {}, () -> {}); } @Override public void onStart() { request(0); } @Override public void onCompleted() { onCompletedAction.call(); } @Override public void onError(Throwable e) { onErrorAction.call(e); } @Override public void onNext(T t) { onNextAction.call(t); } public void requestMore(int n) { request(n); } }
上面的實現中,如果不主動調用 requestMore 函數,則 Observable 是不會發射數據的。
ControlledPullSubscriber<Integer> puller = new ControlledPullSubscriber<Integer>(System.out::println); Observable.range(0, 3) .doOnRequest(i -> System.out.println("Requested " + i)) .subscribe(puller); puller.requestMore(2); puller.requestMore(1);
結果:
Requested 0 Requested 2 0 1 Requested 1 2
ControlledPullSubscriber 在onStart 中告訴 Observable 先不要發射數據。然后我們分別請求 2個數據和1 個數據。
Rx 操作函數內部使用隊列和緩沖來實現 backpressure ,從而避免保存無限量的數據。大量數據的緩沖應該使用專門的操作函數來處理,例如:cache、buffer 等。 zip 函數就是一個示例,第一個 Observable 可能在第二個 Observable 發射數據之前就發射了一個或者多個數據。所以 zip 需要一個較小的緩沖來匹配兩個 Observable,從而避免操作失敗。因此, zip 內部使用了一個 128 個數據的小緩沖。
Observable.range(0, 300) .doOnRequest(i -> System.out.println("Requested " + i)) .zipWith( Observable.range(10, 300), (i1, i2) -> i1 + " - " + i2) .take(300) .subscribe();
結果:
Requested 128 Requested 90 Requested 90 Requested 90
zip 操作函數一開始請求足夠(128)的數據來填充緩沖并處理這些數據。這里 zip 操作函數具體緩沖的數據并不是主要的。讀者應該記住,在 Rx 中不管開發者有沒有主動啟用該功能,有些操作函數內部會使用該功能。這樣可以保證 Rx 數據流更加穩定可擴展。
Backpressure 策略
很多 Rx 操作函數內部都使用了 backpressure 從而避免過多的數據填滿內部的隊列。這樣處理慢的消費者就會把這種情況傳遞給前面的消費者,前面的消費者開始緩沖數據直到他也緩存滿為止再告訴他前面的消費者。Backpressure 并沒有消除這種情況。只是讓錯誤延遲發生,我們還是需要處理這種情況。
Rx 中有操作函數可以用來處理這種消費者處理不過來的情況。
onBackpressureBuffer
onBackpressureBuffer 會緩存所有當前無法消費的數據,直到 Observer 可以處理為止。
你可以指定緩沖的數量,如果緩沖滿了則會導致數據流失敗。
Observable.interval(1, TimeUnit.MILLISECONDS) .onBackpressureBuffer(1000) .observeOn(Schedulers.newThread()) .subscribe( i -> { System.out.println(i); try { Thread.sleep(100); } catch (Exception e) { } }, System.out::println );
結果:
0 1 2 3 4 5 6 7 8 9 10 11 rx.exceptions.MissingBackpressureException: Overflowedbufferof 1000
上面的示例,生產者比消費者快 100 倍。使用 1000個緩沖來處理這種消費者比較慢的情況。當消費者消費 11個數據的時候,緩沖區滿了,生產者生產了 1100個數據。數據流就拋出異常了。
onBackpressureDrop
如果消費者無法處理數據,則 onBackpressureDrop 就把該數據丟棄了。
Observable.interval(1, TimeUnit.MILLISECONDS) .onBackpressureDrop() .observeOn(Schedulers.newThread()) .subscribe( i -> { System.out.println(i); try { Thread.sleep(100); } catch (Exception e) { } }, System.out::println);
結果:
0 1 2 ... 126 127 12861 12862 ...
這個示例中,前面 128 個數據正常的被處理的,這是應為 observeOn 在切換線程的時候, 使用了一個 128 個數據的小緩沖。