RxJava 教程第三部分:馴服數據流之 時間平移
Rx 最大的特征之一就是無法預測何時會有數據發射。有些 Observable 會同步的即可發射所有的數據,比如 range ,有些按照一定的時間間隔發射數據、有些根本無法確定到底何時發射數據。例如,鼠標移動時事件和 UDP 數據包到達的時刻。我們需要合適的工具來處理這些無法確定何時發射的事件。
Buffer 緩存
Buffer 可以收集數據并緩存起來,等緩存到固定的數目后一起發射,而不是來一個發射一個。有多種方式可以緩存數據。
Complete, non-overlapping buffering
先來看看每個值只緩沖一次的操作函數。
buffer by count
最簡單的是根據個數來緩沖,當緩沖滿了把這幾個數據一起發射出去。
Observable.range(0, 10)
.buffer(4)
.subscribe(System.out::println);
結果:
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9]
buffer by time
還可以根據時間來緩沖,每隔一段時間緩沖一次,當時間到了,把緩沖的數據一起發射出去。
下面的示例中,每隔100毫秒發射一個數據,使用250毫秒的時間窗口來緩沖數據。
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
.buffer(250, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
結果:
[0, 1]
[2, 3]
[4, 5, 6]
[7, 8]
[9]
注意:每個時間窗口發射的數據個數可能是不一樣的。在某個時間窗口內,也可能沒有數據發射。
buffer by count and time
也可以同時用數目和時間作為緩沖條件,任意一個條件滿足了(緩沖的個數達到了或者當前時間窗口結束了),就發射緩沖到的數據。
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.buffer(250, TimeUnit.MILLISECONDS, 2)
.subscribe(System.out::println);
結果:
[0, 1]
[]
[2, 3]
[]
[4, 5]
[6]
[7, 8]
[]
[9]
上面這個示例中,發射了很多空的數據。這是應為,發射了2個數據后,達到了數目緩沖的要求,就觸發了一次發射數據的操作,然后過了幾十毫秒又觸發了時間緩沖的條件,這個時候還沒有新的數據發射,所以就觸發一個空的列表發射出來。
buffer with signal
除了時間和數目以外,還可以使用另外一個 Observable 作為信號來緩沖數據,當信號 Observable 的 onNext 事件觸發的時候,源 Observable 中就發射緩沖的數據。
下面的示例和 .buffer(250, TimeUnit.MILLISECONDS) 是一樣的效果:
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
.buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
.subscribe(System.out::println);
使用信號緩沖還有另外一種形式,.buffer(() -> Observable.interval(250, TimeUnit.MILLISECONDS)) ,使用一個函數來返回一個信號Observable,而上面是直接使用信號 Observable,區別是,只有當有Subscriber 訂閱的時候,使用函數返回的信號 Observable 才開始執行。
交錯的緩沖
上面所介紹的所有操作函數都有對應的變體函數,可以重疊的緩沖數據,也可以跳過一些數據。
buffer by count
除了指定緩沖的數目以外,還可以指定間隔多少個數據開始下一個緩沖。
從上圖可以看到,每隔 3個數據開始一個緩沖,每次緩沖 2 個數據。所以 每次第三個數據都被丟棄了。 你還可以在前一個緩沖結束之前就開始下一個緩沖。
- 當 count > skip 的時候,緩沖的數據重疊了
- 當 count < skip 的時候,緩沖的數據有丟失
- 當 count = skip 的時候,和前面看到的簡單版本一樣
下面是一個緩沖重疊的示例:
Observable.range(0,10)
.buffer(4, 3)
.subscribe(System.out::println);
結果:
[0, 1, 2, 3]
[3, 4, 5, 6]
[6, 7, 8, 9]
[9]
每隔 3 個數據開始一個緩沖,每次緩沖 4 個數據。
buffer by time
同樣,對于時間緩沖也可以指定緩沖的時間窗口和每個多久開始下一個緩沖。
同樣:
- 當 timespan > timeshift 的時候,緩沖的數據重疊了
- 當 timespan < timeshift 的時候,緩沖的數據有丟失
- 當 timespan = timeshift 的時候,和前面看到的簡單版本一樣
下一個示例中,每隔 200毫秒開啟下一個緩沖,每個緩沖時間窗口是 350毫秒。所以兩個緩沖之間會有 150毫秒的重疊。
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
.buffer(350, 200, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
結果:
[0, 1, 2]
[2, 3, 4]
[3, 4, 5, 6]
[5, 6, 7, 8]
[7, 8, 9]
[9]
buffer by signal
最后一個強大的緩沖操作可以使用兩個信號 Observable 來分別標示何時開始緩沖何時結束緩沖。
public final <TOpening,TClosing> Observable<java.util.List<T>> buffer(
Observable<? extends TOpening> bufferOpenings,
Func1<? super TOpening,? extends Observable<? extends TClosing>> bufferClosingSelector)
這個操作函數有兩個參數,第一個 bufferOpenings 參數為一個 Observable,只要該 Observable 發射了一個數據,就開始一個新的緩沖。每個緩沖到的數據會傳遞給第二個函數參數 bufferClosingSelector ,bufferClosingSelector 參數為一個函數,該函數創建一個新的 Observable,當這個 Observable 發射數據的時候表明這個緩沖結束。
代碼來演示:
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
.buffer(
Observable.interval(250, TimeUnit.MILLISECONDS),
i -> Observable.timer(200, TimeUnit.MILLISECONDS))
.subscribe(System.out::println);
結果:
[2, 3]
[4, 5]
[7, 8]
[9]
使用 Observable.interval 創建一個 Observable,這個 Observable 每隔 250毫秒開始一個新的緩沖。由于使用 interval 創建的 Observable,第一個數據是在 250毫秒后發射的,所以第一個緩沖也是從 250毫秒開始的。上面示例中還使用 bufferOpenings Observable 發射的數據作為參數來創建結束緩沖的函數,但是由于上面的示例比較簡單,并沒有使用這個數據(也就是上面示例中的Lambda 表達式中的參數 i 并沒有用)。只是創建了一個等待 200毫秒就發射一個數據的 Observable。這樣每個緩沖的時間窗口就是固定的 200毫秒。
takeLastBuffer
在前面我們已經看到了 takeLast 操作函數,該函數返回最后 N 個數據。takeLast 內部實現需要緩沖最后的 N 個數據,并當數據流結束后從新發送最后 N 個數據。而 takeLastBuffer 把最后的數據作為一個數據返回。
By count
返回最后 N 個數據
Observable.range(0, 5)
.takeLastBuffer(2)
.subscribe(System.out::println);
結果:
[3, 4]
By time
當源 Observable 結束的時候,從后面開始倒數這個時間窗口,把里面所有發射的數據返回。
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5)
.takeLastBuffer(200, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
結果:
[2, 3, 4]
By count and time
在時間窗口上添加一個數量的限制,如果最后的時間窗口中的數據數目大于規定的數量,則只返回最后幾個規定數量的數據。
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5)
.takeLastBuffer(2, 200, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
結果:
[3, 4]
如果修改為 .takeLastBuffer(1, 200, TimeUnit.MILLISECONDS),則返回的結果為 [4]
Delay 延時
顧名思義,delay 操作會延時一段時間再發射數據。有兩種方式實現這個效果;一是緩存這些數據,等一段時間后再發射;或者是把Subscriber 訂閱的時間延遲。
delay
簡單的 delay 函數只是把每個數據都延時一段時間再發射,相當于把整個數據流都往后推遲了。
示例
Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
.delay(1, TimeUnit.SECONDS)
.timeInterval()
.subscribe(System.out::println);
結果:
TimeInterval [intervalInMilliseconds=1109, value=0]
TimeInterval [intervalInMilliseconds=94, value=1]
TimeInterval [intervalInMilliseconds=100, value=2]
TimeInterval [intervalInMilliseconds=100, value=3]
TimeInterval [intervalInMilliseconds=101, value=4]
可以看到,第一個數據差不多被延遲了 1s ,后面每隔 100ms 左右發射下一個數據。
還可以分別推遲每個數據的時間
這個重載函數的參數為一個函數,該函數的參數為源 Observable 發射的數據返回一個 信號Observable。當信號 Observable 發射數據的時候,也就是源 Observable 的數據發射的時候。
Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
.delay(i -> Observable.timer(i * 100, TimeUnit.MILLISECONDS))
.timeInterval()
.subscribe(System.out::println);
結果:
TimeInterval [intervalInMilliseconds=152, value=0]
TimeInterval [intervalInMilliseconds=173, value=1]
TimeInterval [intervalInMilliseconds=199, value=2]
TimeInterval [intervalInMilliseconds=201, value=3]
TimeInterval [intervalInMilliseconds=199, value=4]
源 Observable 每隔 100ms 發射一個數據,而結果顯示為 200ms 發射一個數據。interval 從 0 開始發射數據, i 結果為 0、1、2 等,每隔數據推遲了 i*100ms 再發射。所以后面每隔數據都比前一個數據多推遲了100ms,結果就是每個數據差不多間隔200ms 發射。
delaySubscription
除了緩存數據,延遲發射緩沖的數據以外,還可以選擇使用推遲訂閱的方式。根據 Observable 是 hot 或者 cold 則會有不同的結果。后面會專門的介紹 cold 和 hot Observable 的區別。這里的示例為 cold Observable,推遲訂閱到 cold Observable 和推遲整個數據流是一樣的效果。 但是由于推遲訂閱不需要緩存發射的數據,所以更加高效。
Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
.delaySubscription(1000, TimeUnit.MILLISECONDS)
.timeInterval()
.subscribe(System.out::println);
結果:
TimeInterval [intervalInMilliseconds=1114, value=0]
TimeInterval [intervalInMilliseconds=92, value=1]
TimeInterval [intervalInMilliseconds=101, value=2]
TimeInterval [intervalInMilliseconds=100, value=3]
TimeInterval [intervalInMilliseconds=99, value=4]
可以看到整個數據流推遲了 1000ms。
同樣還有一個重載函數,可以使用另外一個 Observable 來告訴Subscriber 何時訂閱
public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>> subscriptionDelay)
下面的示例,和上面的是一樣的效果:
Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
.delaySubscription(() -> Observable.timer(1000, TimeUnit.MILLISECONDS))
.timeInterval()
.subscribe(System.out::println);
delay values and subscription
該分類的最后一個操作函數既可以推遲單個數據,也可以推遲訂閱。
public final <U,V> Observable<T> delay(
Func0<? extends Observable<U>> subscriptionDelay,
Func1<? super T,? extends Observable<V>> itemDelay)
這是一個組合情況,兩個參數分別控制推遲多久訂閱和推遲多久發射數據。
Sample
sample 操作可以把一個數據流分割為一個一個的時間窗口,當每個時間窗口結束的時候,發射該時間窗口中的最后一個數據。
Observable.interval(150, TimeUnit.MILLISECONDS)
.sample(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
結果:
5
12
18
...
每個時間窗口的時間長短也可以不同。你一樣可以使用一個信號 Observable 來指定時間結束:
下面這個示例和前面的效果是一樣的
Observable.interval(150, TimeUnit.MILLISECONDS)
.sample(Observable.interval(1, TimeUnit.SECONDS))
.subscribe(System.out::println);
Throttling
Throttling 也是用來過濾數據的。
throttleFirst
throttleFirst 操作函數接收到第一個數據后,就開啟一個時間窗口,在規定的時間窗口內發射第一個數據,后面的數據丟棄直到時間窗口結束。當時間窗口結束后,下一個數據發射后將開啟下一個時間窗口。
Observable.interval(150, TimeUnit.MILLISECONDS)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
結果:
0
7
14
...
上面示例中,interval 每隔150ms 發射一個數據。從數據流開始,每個數值都是在 (i+1)*150ms 的時候發射。第一個數據 0 在 第150ms 發射,然后后面 1000ms 時間內發射的所有數據都被丟棄了。然后下一個數據在第1200ms 的時候發射,然后后面 1000ms 的數據又被丟棄。
throttleLast
throttleFirst 是根據第一個數據發射后來計時的,而 throttleLast 則使用一樣的時間來分割數據流,發射每個時間窗口內的最后一個數據。
Observable.interval(150, TimeUnit.MILLISECONDS)
.throttleLast(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
結果:
5
12
18
...
上面示例中,當數據流開始的時候,時間窗口也同時開始了。第一個時間窗口在 1000ms 的時候結束,最后一個數據在 900ms的時候產生;下一個時間窗口從 1000ms 到 2000ms,這個時間窗口內的最后一個數據在 1950ms 的時候產生;同樣可以計算下一個時間窗口內的數據在 2850ms 的時候產生。
Debouncing
debounce 操作函數的功能是:當一個數據發射的時候,就開始一個時間窗口計時,當這個時間窗口結束了還沒有新的數據發射,則就發射這數據。如果在這個時間窗口內,又發射了一個新的數據,則當前數據丟棄,從新開始時間窗口計時。
使用簡單的 interval 函數無法演示這種情況。所以需要使用一個稍微復雜的 Observable:
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
)
.scan(0, (acc, v) -> acc+1)
.timeInterval()
.subscribe(System.out::println);
這個 Observable 的結果為:
TimeInterval [intervalInMilliseconds=110, value=0]
TimeInterval [intervalInMilliseconds=1, value=1]
TimeInterval [intervalInMilliseconds=98, value=2]
TimeInterval [intervalInMilliseconds=101, value=3]
TimeInterval [intervalInMilliseconds=502, value=4]
TimeInterval [intervalInMilliseconds=500, value=5]
TimeInterval [intervalInMilliseconds=499, value=6]
TimeInterval [intervalInMilliseconds=102, value=7]
TimeInterval [intervalInMilliseconds=99, value=8]
TimeInterval [intervalInMilliseconds=101, value=9]
從上面的結果中,可以看到我們構造的這個 Observable 先間隔 100ms 發射 4個數字,然后間隔 500ms 再發射 3 個數字,然后再間隔 100ms 發射 3個數字。scan 函數的作用只是把數字轉換為自然順序發射出去,這個可以區分每個發射出去的數字。
下面用 debounce 函數來操作我們這個源 Observable:
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
)
.scan(0, (acc, v) -> acc+1)
.debounce(150, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
結果:
3
4
5
9
這個示例中推遲的時間窗口為 150ms, 前面4個數字都是每隔100ms 就發射了,而第五個數字間隔 500ms 才發射,所以只有第四個數字延遲的時間超過了 150ms 被發射了,同樣,4、5這兩個數字也是延遲 500ms 發射的。
同樣,也可以在每個數據上設置不同的推遲時間窗口,通過一個函數參數來設置每個數據需要推遲的時間。當這個信號 Observable 結束的時候,代表推遲的時間窗口結束。
在下面這個示例中,每個數字i的推遲時間窗口為 i*50 ms
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
)
.scan(0, (acc, v) -> acc+1)
.debounce(i -> Observable.timer(i * 50, TimeUnit.MILLISECONDS))
.subscribe(System.out::println);
結果:
1
3
4
5
9
下面是一個表格,列出了每個數字的推遲時間窗口和該數字和下一個數字之間的時間間隔:
數字 | 計算出的推遲時間窗口 | 距離下一個數字發射的時間 | 時間窗口是否小于發射間隔 |
---|---|---|---|
0 | 0 | 1 | |
1 | 50 | 98 | Yes |
2 | 100 | 101 | (Java 中基于時間的運算并不完全精準) |
3 | 150 | 502 | Yes |
4 | 200 | 500 | Yes |
5 | 250 | 499 | Yes |
6 | 300 | 102 | |
7 | 350 | 99 | |
8 | 400 | 101 | |
9 | 450 | Yes |
這個操作函數非常適合輸入提示的場景,比如在一個具有輔助提示功能的輸入框中,如果每次用戶輸入了一個字符,就立刻去計算提示的內容,則實際用起來會比較煩人,通常情況下都是延遲一段時間后,再去請求提示內容并顯示,如果用戶在延遲的時間還沒到,就輸入了下一個字符,則重新開始新的延遲計時。
Timeout
timeout 用來檢測一個 Observable 是否很長時間內沒有數據發射了。 如果超過了這個時間沒有發射數據,則 timeout 函數會把源 Observable 結束掉并發射一個 TimeoutException 異常。
使用前面使用過的例子來演示 timeout:
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
)
.scan(0, (acc, v) -> acc+1)
.timeout(200, TimeUnit.MILLISECONDS)
.subscribe(
System.out::println,
System.out::println);
結果:
0
1
2
3
java.util.concurrent.TimeoutException
如果超過200ms 還沒有下一個數據發射,則拋出一個 TimeoutException并結束 Observable。
如果超時了,可以指定一個替代的 Observable 而不是拋出 TimeoutException 異常。這樣超時了,就使用 替代的 Observable 繼續發射。源 Observable將會被丟棄。
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
)
.scan(0, (acc, v) -> acc+1)
.timeout(200, TimeUnit.MILLISECONDS, Observable.just(-1))
.subscribe(
System.out::println,
System.out::println);
結果:
0
1
2
3
-1
同樣,也可以每一個數據指定一個不同的超時時間。提供一個函數為每個數據返回一個 信號Observable,當信號 Observable 結束的時候,代表該數據后面的超時時間,如果信號 Observable 結束了,還沒有數據發射,則代表超時了。
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
)
.scan(0, (acc, v) -> acc+1)
.timeout(i -> Observable.timer(200, TimeUnit.MILLISECONDS))
.subscribe(
System.out::println,
System.out::println);
同樣,也可以指定一個替代超時異常的 Observable
.timeout(i -> Observable.timer(200, TimeUnit.MILLISECONDS), Observable.just(-1))
結果和前面的示例是一樣的。