如何形象地描述RxJava中的背壓和流控機制?
之前我在知乎上受邀回答過一個關于RxJava背壓(Backpressure)機制的問題,今天我把它整理出來,希望對更多的人能有幫助。
RxJava的官方文檔中對于背壓(Backpressure)機制比較系統的描述是下面這個:
https://github.com/ReactiveX/RxJava/wiki/Backpressure
但本文的題目既然是要“形象地”描述各個機制,自然會力求表達簡潔,讓人一看就懂。所以,下面我會盡量拋開一些抽象的描述,主要采用打比方的方式來闡明我對于這些機制的理解。
首先,從大的方面說,上面這篇文檔的題目,雖然叫“Backpressure”(背壓),但卻是在講述一個更大的話題——“Flow Control”(流控)。Backpressure只是Flow Control的其中一個方案。
在RxJava中,可以通過對Observable連續調用多個Operator組成一個調用鏈,其中數據從上游向下游傳遞。當上游發送數據的速度大于下游處理數據的速度時,就需要進行Flow Control了。
這就像小學做的那道數學題:一個水池,有一個進水管和一個出水管。如果進水管水流更大,過一段時間水池就會滿(溢出)。這就是沒有Flow Control導致的結果。
Flow Control有哪些思路呢?大概是有四種:
- (1) 背壓(Backpressure)。
- (2) 節流(Throttling)。
- (3) 打包處理。
- (4) 調用棧阻塞(Callstack blocking)。
下面分別詳細介紹。
注意:目前RxJava的1.x和2.x兩個版本序列同時并存,2.x相對于1.x在接口上有很大變動,其中也包括Backpressure的部分。但是,這里要討論的Flow Control機制中的相關概念,卻都是適用的。
Flow Control的幾種思路
背壓(Backpressure)
Backpressure,也稱為Reactive Pull,就是下游需要多少(具體是通過下游的request請求指定需要多少),上游就發送多少。這有點類似于TCP里的流量控制,接收方根據自己的接收窗口的情況來控制接收速率,并通過反向的ACK包來控制發送方的發送速率。
這種方案只對于所謂的cold Observable有效。cold Observable指的是那些允許降低速率的發送源,比如兩臺機器傳一個文件,速率可大可小,即使降低到每秒幾個字節,只要時間足夠長,還是能夠完成的。相反的例子是音視頻直播,數據速率低于某個值整個功能就沒法用了(這種就屬于hot Observable了)。
節流(Throttling)
節流(Throttling),說白了就是丟棄。消費不過來,就處理其中一部分,剩下的丟棄。還是舉音視頻直播的例子,在下游處理不過來的時候,就需要丟棄數據包。
而至于處理哪些和丟棄哪些數據,就有不同的策略。主要有三種策略:
- sample (也叫throttleLast)
- throttleFirst
- debounce (也叫throttleWithTimeout)
從細的方面分別解釋一下。
sample,采樣。類比一下音頻采樣,8kHz的音頻就是每125微秒采一個值。sample可以配置成,比如每100毫秒采樣一個值,但100毫秒內上游可能過來很多值,選哪個值呢,就是選最后那個值。所以它也叫throttleLast。
throttleFirst跟sample類似,比如還是每100毫秒采樣一個值,但選這100毫秒內的第一個值。在Android開發中有時候可以把throttleFirst用作點擊事件的防抖動處理,就是因為它可以在指定的一段時間內處理第一個點擊事件(即采樣第一個值),但丟棄后面的點擊事件。
debounce,也叫throttleWithTimeout,名字里就包含一個例子。比如,一個網絡程序維護一個TCP連接,不停地收發數據,但中間沒數據可以收發的時候,就有間歇。這段間歇的時間,可以稱為idle time。當idle time超過一個預設值的時候,就算超時了(time out),這個時候可能就需要把連接斷開了。實際上一些做server端的網絡程序就是這么工作的。每收發一個數據包之后,啟動一個計時器,等待一個idle time。如果計時器到時之前,又有收發數據包的行為,那么計時器重置,等待一個新的idle time;而如果計時器時間到了,就超時了(time out),這個連接就可以關閉了。debounce的行為,跟這個非常類似,可以用它來找到那些連續的收發事件之后的idle time超時事件。換句話說,debounce可以把連續發生的事件之間的較大的間歇找出來。
打包處理
打包就是把上游來的小包裹打成大包裹,分發到下游。這樣下游需要處理的包裹的個數就減少了。RxJava中提供了兩類這樣的機制:buffer和window。
buffer和window的功能基本一樣,只是輸出格式不太一樣:buffer打包后的包裹用一個List表示,而window打包后的包裹又是一個Observable。
調用棧阻塞(Callstack blocking)
這是一種特殊情況,阻塞住整個調用棧(Callstack blocking)。之所以說這是一種特殊情況,是因為這種方式只適用于整個調用鏈都在一個線程上同步執行的情況,這要求中間的各個operator都不能啟動新的線程。在平常使用中這種應該是比較少見的,因為我們經常使用subscribeOn或observeOn來切換執行線程,而且有些復雜的operator本身也會在內部啟動新的線程來處理。另外,如果真的出現了完全同步的調用鏈,前面的另外三種Flow Control思路仍然可能是適用的,只不過這種阻塞的方式更簡單,不需要額外的支持。
這里舉個例子把調用棧阻塞和前面的Backpressure比較一下。“調用棧阻塞”相當于很多車行駛在盤山公路上,而公路只有一條車道。那么排在最前面的第一輛車就擋住了整條路,后面的車也只能排在后面。而“Backpressure”相當于銀行辦業務時的窗口叫號,窗口主動叫某個號過去(相當于請求),那個人才過去辦理。
如何讓Observable支持Backpressure?
在RxJava 1.x中,有些Observable是支持Backpressure的,而有些不支持。但不支持Backpressure的Observable可以通過一些operator來轉化成支持Backpressure的Observable。這些operator包括:
- onBackpressureBuffer
- onBackpressureDrop
- onBackpressureLatest
- onBackpressureBlock(已過期)
它們轉化成的Observable分別具有不同的Backpressure策略。
而在RxJava 2.x中,Observable不再支持Backpressure,而是改用Flowable來專門支持Backpressure。上面提到的四種operator的前三種分別對應Flowable的三種Backpressure策略:
- BackpressureStrategy.BUFFER
- BackpressureStrategy.DROP
- BackpressureStrategy.LATEST
onBackpressureBuffer是不丟棄數據的處理方式。把上游收到的全部緩存下來,等下游來請求再發給下游。相當于一個水庫。但上游太快,水庫(buffer)就會溢出。
onBackpressureDrop和onBackpressureLatest比較類似,都會丟棄數據。這兩種策略相當于一種令牌機制(或者配額機制),下游通過request請求產生令牌(配額)給上游,上游接到多少令牌,就給下游發送多少數據。當令牌數消耗到0的時候,上游開始丟棄數據。但這兩種策略在令牌數為0的時候有一點微妙的區別:onBackpressureDrop直接丟棄數據,不緩存任何數據;而onBackpressureLatest則緩存最新的一條數據,這樣當上游接到新令牌的時候,它就先把緩存的上一條“最新”數據發送給下游。可以結合下面兩幅圖來理解。
onBackpressureBlock是看下游有沒有需求,有需求就發給下游,下游沒有需求,不丟棄,但試圖堵住上游的入口(能不能真堵得住還得看上游的情況了),自己并不緩存。這種策略已經廢棄不用。
本文重點在于以宏觀的角度來描述和對比RxJava中的Flow Control機制和Backpressure的各種機制,很多細節沒有涉及。比如,buffer和window除了能把一段時間內收到的數據打包,還能把固定數量的數據進行打包。再比如,onBackpressureDrop和onBackpressureLatest在一次收到下游多條數據的請求時分別會如何表現,本文沒有詳細說明。大家可以查閱相應的API Reference來獲得答案,也歡迎留言與我一起討論。
(完)
來自:http://zhangtielei.com/posts/blog-rxjava-backpressure.html