RxJava學習記錄

qiyue 7年前發布 | 8K 次閱讀 RxJava

RxJava是基于JVM實現的Reactive擴展. 它是一種使用Reactor模式實現的, 異步的, 事件驅動的編程方式. 它支持數據/事件序列, 可以使用操作符將序列結合在一起, 而不用關心底層的細節, 比如線程安全, 同步等問題.

Reactor模式

Reactor模式是事件驅動的, 有一個或多個并發的事件輸入源, 有一個Service Handler集中收集輸入事件, 并分發到相應的Request Handler中進行處理.

 

+-------+ +-----------------+
|Input|-----+ +---->|Request Handler|
+-------+ | |+-----------------+
 | |
+-------+ |+-----------------+|+-----------------+
|Input|-----+-->|Service Handler|---+---->|Request Handler|
+-------+ |+-----------------+|+-----------------+
 | |
+-------+ | |+-----------------+
|Input|-----+ +---->|Request Handler|
+-------+ +-----------------+

角色

RxJava主要有2個角色: Observable和Observer, 他們之間通過訂閱建立聯系, Observable發射出數據, 期間通過 操作符 進行處理, 最后被 Observer 得到

  • Observable: 產生數據
  • Observer: 消費數據
  • 操作符: 轉換數據
+------------+
| Observable |
+------------+
|
subscribe
|
+------------+
| Observer |
+------------+

RxJava的執行過程

  • 創建一個Observable
  • 使用各種操作符進行變換
  • 創建一個Observer訂閱Observable

當Observable發射數據時, Observer進行相應的動作

偽代碼如下

Observable
 .operator()
 .operator()
 .operator()
 .operator()
 .subscribe()

Observable和Observer

Observable

Observable是發射數據的來源, 并實現了以下3種方法:

  • onNext()
    Observable調用該方法來發射一條數據
  • onError()
    當遇到錯誤時, Observable會調用該方法. 調用該方法后便不會再調用onNext和onComplete
  • onComplete()
    當Observable最后一次調用onNext完畢后, 會調用該方法.

Observable的 onNext 可以被調用0次或多次

最后, 會調用 onComplete 或者 onError , 這兩者的調用標志著序列結束.

onNext 通常被叫做 發射 , onComplete 和 onError 通常被叫做 通知

冷Observable, 熱Observable, 可連接的Observable

  • 熱Observable
    在創建時就開始發射數據, 如果某個observer在發射中途訂閱了這個Observable, 那么他接收到的數據也從中途開始
  • 冷Observable
    會等待Observer來訂閱它, 然后才開始發射數據
  • 可連接的Observable
    當調用 connect 方法后, 才開始發射數據, 無論是否有Observer對他進行訂閱

Observer

Subscriber是Observer的實現. Subscriber會訂閱Observable發出的事件, 它有兩種基本操作:

  • subscribe
  • unsubscribe

Observable約定

通知

Observable通過以下 通知 與訂閱它的Observer進行通信:

  • onNext
    將Observable發射的數據傳遞給Observer
  • onComplete
    表示Observable成功完成了所有數據的發射, 并將不再發射任何數據
  • onError
    表示Observable由于某種錯誤被終止, 并將不在發射任何數據
  • onSubscribe (可選)
    表示Observable已經準備好接收Observer的請求通知了

Observer通過以下通知來與Observable進行通信:

  • subscribe
    表示Observer已經準備好從Observable接收通知了
  • unsubscribe
    表示Observer不再希望接收Observable的通知
  • request (可選)
    表示Observer希望不再從Observable接收多余某種特定數量的onNext通知

通知的約束

一個Observable可以創建0個或多個onNext通知, 每個通知代表一個單獨發射的數據, 最后跟隨一個onCompleted或onError通知, 兩者之一. 當發出onCompleted或onError通知后, 它將不再發射任何其他通知.

一個Observable可以不發射任何數據. 一個Observable也可以永遠不通過onCompleted和onError來終止. 也就是說, Observable可以不發出任何通知, 或只發出onCompleted或onError通知, 或執法處onNext通知.

Observable必須按順序向Observer發出通知, 而不能并行發出通知. Observable可以在不同的線程中發出通知, 但通知之間必須存在 happens-before 的關系.

Observable的終止

如果一個Observable沒有發出onCompleted或onError通知, 那么Observer會認為該Observable仍然是活動的(即使已經不再發射任何數據), 并且該Observer可能會向其發出通知(比如unsubscribe或是request通知). 當一個Observable發出onCompleted或onError通知時, 該Observable可能會釋放他的資源, 并終止, 他的Observer則不應該再與其進行通信.

onError通知必須包含錯誤的原因 (也就是說, 調用帶有null值的onError是無效的)

在Observable終止之前, 它必須向訂閱它的Observer發出onCompleted或onError之一

subscribe和unsubscribe

Observable在接收到Observer發出的subscribe通知后, 會開始發出自己的通知

當一個Observer向Observable發出unsubscribe通知時, Observable會嘗試停止向該Observer發出通知. 但并不保證這種情況.

當Observable向Observer發出onError或onCompleted通知時, 會終止訂閱關系. Observer不再需要想Observable發出unsubscribe通知

多個Observer

如果有第二個Observer訂閱了Observable, 而這個Observable此時已經向第一個Observer發射了一些數據, 那么該Observable是否會向第二個Observer繼續發射數據, 或者是否將完整的數據序列重新發射給第二個Observer, 或者是否會向第二個Observer發射完全不同的數據, 以上這些都取決于該Observable的設置. 并不會保證訂閱同一個Observable的兩個Observer會接收到相同的數據序列.

背壓(backpressure)

背壓是可選的; 并不是所有的RX語言都實現了背壓, 并且在實現了背壓的RX語言中, 也并不是所有的Observable或操作符會推薦背壓. (@todo 不再翻譯了)

操作符

大多數的操作符都會返回Observable對象, 所以可以利用這一點進行操作符的鏈式調用, 完成一系列的操作

創建操作符

用于創建Observable

  • create
    手動創建一個Observable, 手動調用observer的方法
  • defer
    當Observer進行subscribe時, 才創建Observable, 并且是為每個observer都創建一個新的Observable
  • empty/never/throw
    創建有限行為的Observable
  • from
    將其他對象或數據結構轉換為Observable
  • interval
    創建一個根據指定時間間隔, 發射整形數字隊列的Observable
  • just
    將一個對象或一個對象集合轉換為一個Observable, 并將它們發射
  • range
    創建一個發射某個范圍內整形數字的Observable
  • repeat
    創建一個重復發射某一個或某個序列數據的Observable
  • start
    創建一個發射某個方法返回值的Observable
  • timer
    創建一個每隔一定時間發射一個數據的Observable

變換操作符

用于將Observable發射的數據進行變換

  • buffer
    間接性收集Observable發射的數據, 將這些數據放入bundle中, 并發射這個bundle
  • flatMap
    將Observable發射的多個數據變換為多個Observable, 然后將他們扁平化, 并放入一個Observable中
  • groupBy
    將一個Observable拆分為多個Observable的集合, 每次發射其中一組Observable, 通過key來結組
  • map
    通過某個函數將一個Observable發射的數據進行變換
  • scan
    對一個Observable發射的每個數據都按順序應用某個方法, 并將返回值發射
  • window
    間接地從一個Observable中拆分數據, 放入window中, 并從window中一次發射一個數據

過濾操作符

用于從Observable中有選擇地發射數據

  • debounce
    只發射Observable中指定間隔之后的數據
  • distinct
    忽略Observable中重復的數據
  • elementAt
    只發射Observable中指定位置的數據
  • filter
    只發射Observable中滿足條件的數據
  • first
    只發射Observable中第一個數據, 或第一個滿足條件的數據
  • ignoreElements
    不發射數據, 而只通知結束(onError或onComplete)
  • last
    只發射Observable中最后一個數據
  • sample
    定期采樣Observable的數據, 并發送距離上次采樣時間最近發射的那個數據
  • skip
    忽略Observable的前n個數據
  • skipLast
    忽略Observable的后n個數據
  • take
    發射Observable的前n個數據
  • takeLast
    發射Observable的后n個數據

組合操作符

用于將多個Observable組合為一個單獨的Observable

  • and/then/when
    利用Pattern和Plan作為中介, 將多個Observable發射的數據合并到一個Observable
  • combineLatest
    將兩個Observable最新發射的數據結合, 通過某個方法進行運算, 并將該方法的結果發射出去
  • join
    將兩個Observable發射的, 在同一時間窗口內的數據結合起來
  • merge
    將多個Observable的數據合并為一個Observable
  • startWith
    在發射Observable的數據之前, 先發射指定序列的數據
  • switch
    將發射Observable的多個Observable轉換為一個單獨的Observable
  • zip
    將多個Observable的發射數據通過一個指定的方法結合在一起, 并將通過該方法結合后的數據發射出去

錯誤處理操作符

從錯誤通知中恢復

  • catch
    從onError中恢復, 并繼續執行序列
  • retry
    如果Observable發送了一個onError通知, 則重新subscribe這個Observable

Observable工具操作符

工具

  • delay
    延時后發射
  • do
    注冊一個action來處理Observable的生命周期事件
  • meterialize/dematerialize
    將onNext, onError, onComplete轉換為數據序列, 由Observable發出; 或者反之
  • observeOn
    指定observer觀察Observable所在的線程
  • Serialize
    強制序列同步執行
  • subscribe
    對Observable進行發射和通知操作
  • subscribeOn
    指定當Observable被訂閱時, 所應使用的線程
  • timeInterval
    將打算發射的數據轉換為發射數據的時間
  • timeout
    如果某段時間內沒有發射任何數據, 則發出onError通知
  • timestamp
    為Observable發射的每個數據都附加一個時間戳
  • using
    創建一個一次性的資源, 該資源和Observable具有相同的生命周期

條件和布爾操作符

對Observable和數據進行判斷

  • all
    判斷是否所有發射的數據都符合某個約束
  • amb
    將多個Observable的數據由第一個Observable來發射
  • contains
    判斷某個Observable是否發射了指定的數據
  • defaultIfEmpty
    發射Observable的數據, 如果Observable沒有數據可以發射時, 發射一個默認數據
  • sequenceEqual
    判斷兩個Observable發射的數據序列是否相同
  • skipUntil
    放棄一個Observable所發射的數據, 直到另一個Observable開始發射數據
  • skipWhile
    放棄一個Observable所發射的數據, 直到某個指定條件變為false
  • takeUntil
    在一個Observable開始發射或結束發射數據后, 放棄另一個Observable所發射的數據
  • takeWhile
    當某個指定條件變為false時, 放棄一個Observable所發射的數據

數學和匯總操作符

操作數據序列

  • average
    計算一個Observable發射數據的數量的平均值, 并將該平均值發射
  • concat
    拼接多個Observable所發射的數據, 并將所有數據發射
  • count
    計算Observable所發射數據的數量, 并將該數量發射
  • max
    計算并發射Observable所發射的最大的數據
  • min
    計算并發射Observable所發射的最小的數據
  • Reduce
    對Observable所發射的數據按順序應用一個方法, 并將方法值發射
  • sum
    計算Observable所發射的數據數量總和, 并將該總和值發送

背壓操作符

@todo

連接操作符

特殊的Observable, 擁有更多特性

  • connect
    指示一個可連接的Observable開始發射數據到它的subscriber
  • publish
    將原始Observable轉換為一個可連接的Observable
  • refCount
    使一個可連接的Observable表現的和原始Observable一樣
  • replay
    確保所有Observable發射數據序列的順序是相同的, 即使當subscribe時Observable已經開始發射數據

轉換操作符

  • to
    將一個Observable轉換為另一個對象或數據結構

異步操作符

用于將同步方法轉換為Observable

  • start()
  • toAsync()
    將一個 方法 轉換為 Observable 來執行方法并發射返回值
  • asyncAction()
  • asyncFunc()
  • startFuture()
    將一個 返回值為Future的方法 轉換為一個 Observable , 并發射Future的返回值
  • deferFuture()
    將一個 返回值為Observable的Future 轉換為一個 Observable , 當有Subscriber訂閱時, 才返回Future的Observable返回值(有點繞)
  • forEachFuture()
    將Subscriber方法轉入一個Observable中, 直到complete時才執行
  • fromAction()
    將一個Action轉換為一個Observable, 當Subscriber訂閱時, 執行該動作并發射結果
  • fromCallable()
    將一個Callable轉換為一個 Observable , 當Subscriber訂閱時, 執行callable并發射其結果或異常
  • fromRunnable()
    將一個 Runnable 轉換為一個 Observable , 當Subscriber訂閱時, 執行runnable并發射其結果
  • runAsync()
    返回一個 StoppableObservable , 它可以發射某個Action或Scheduler指定的多個action

操作符的選擇樹

以下可以幫助你選擇合適的操作符:

我想創建一個新的Observable
|_ 它只發射一個數據: `just`
| |_ 該數據是在subscribe時, 由一個方法返回的: `start`
| |_ 該數據是在subscribe時, 由Action, Callable, Runnable或類似的返回的: `from`
| |_ 該數據在某段時間后發射: `timer`
|_ 它是從某個Array, Iterrable或類似的發射數據的: `from`
|_ 它是從一個Future獲取到的: `start`
|_ 它是從一個Future獲取的數據序列: `from`
|_ 它會重復發射數據序列: `repeat`
|_ 它是從某個自定義邏輯創建的: `create`
|_ 它為每個subscribe的Observer都創建一個新的Observable: `defer`
|_ 它發射一個整形數字序列: `range`
| |_ 該序列會根據某種時間間隔發射: `interval`
| |_ 并且會在某段延時后才開始發射: `timer`
|_ 它不發射任何數據就會結束: `empty`
|_ 它什么都不做: `never`
我想通過組合多個Observable來創建一個新的Observable
 |_ 它會發射所有的Observable中的數據, 順序按照數據默認的順序: `merge`
 |_ 它會發射所有的Observable中的數據, 一次只發射一個Observable的數據: `concat`
 |_ 它會將多個Observable的數據按順序組合, 生成一個新的數據來發射
 | |_ 并且將每個Observable發射的數據通過某個方法結合, 發射該方法處理后的數據: `zip`
 | |_ 并且將每個Observable最新發射的數據結合為一個數據進行發射: `combinLatest`
 | |_ 并且將同一window內的數據結合為一個數據進行發射
 | |_ 通過Pattern和Plan中介來發射: `and/then/when`
 |_ 它是從這些Observable最近發射的數據中發射數據: `switch`
我想將Observable的數據進行變換后再發射
|_ 變換的方式是通過某個方法一次發射一個數據: `map`
|_ 變換的方式是發射多個Observable中的所有數據: `flatMap`
| |_ 按照發射時間順序, 一次發射一個Observable: `concatMap`
|_ based on all of the items that preceded them: `scan`
|_ 變換的方式是為每個數據附加一個時間戳: `timestamp`
|_ 變換的方式是將數據轉變為距離上次發射的時間間隔: `timeInterval`
我想延長數據發射時間: `delay`
我想將數據和通知都轉換為該Observable的數據, 并重新進行發射
|_ 通過將他們封裝在Notification對象中: `materialize`
|_ 并且我還可以再次解除封裝: `dematerialize`
我想忽略Observable的所有數據, 只發射complete和error通知: `ignoreElements`
我想復制一個Observable, 并在它的數據序列前添加其他的數據序列: `startWith`
|_ 并且僅在該Observable數據序列為空的情況下才添加其他數據序列: `defaultEmpty`
我想從一個Observable中收集數據, 并通過一個數據的緩沖重新發射: `buffer`
|_ 該緩沖只包含最后一個數據: `takeLastBuffer`
我想將一個Observable拆分為多個Observable: `window`
|_ 并且相似的數據可以在同一個Observable中: `groupBy`
我想從一個Observable發射的數據中獲取某個特定的數據
|_ 要獲取的是在complete前發射的最后一個數據: `last`
|_ 要獲取的是一個單獨的數據: `single`
|_ 要獲取的是第一個發射的數據: `first`
我想重新發射一個Observable中的某些數據
|_ 只要滿足過濾條件的數據: `filter`
|_ 只要第一個數據: `first`
|_ 只要前n個數據: `take`
|_ 只要最后一個數據: `last`
|_ 只要第n個數據: `elementAt`
 |_ 只要前幾個數據之后的數據
| |_ 即, 跳過前n個數據: `skip`
| |_ 即, 直到某個數據滿足某種特定條件之后所發射的數據: `skipWhile`
| |_ 即, 在開始發射某段時間后的數據: `skip`
| |_ 即, 在另一個Observable開始發射數據之后, 原Observable所發射的數據: `skipUtil`
 |_ 只要除最后幾個數據之外的數據
| |_ 即, 除最后n個數據之外的數據: `skipLast`
| |_ 即, 在某個數據滿足某種特定條件之前所發射的數據: `takeWhile`
| |_ 即, 在complete某段時間之前所發射的數據: `skipLast`
| |_ 即, 在另一個Observable開始發射數據之前, 原Observable所發射的數據: `takeUtil`
|_ 只要間歇采樣的數據: `sample`
|_ 只要某段時間內不會再有數據發射的數據: `debounce`
|_ 只要與已發射數據不重復的數據: `distinct`
| |_ if they immediately follow the item they are duplicates of: `distinctUntilChanged`
|_ 只要在Observable開始發射數據后, 我的subscriber進行subscribe以后的數據: `delaySubscription`
如果某個Observable是一個Observable集合的第一個, 則重新發射他的數據: `amb`
我想對Observable發射的數據序列進行判斷
|_ 判斷是否所有數據都滿足條件, 然后發射一個單獨的boolean值: `all`
|_ 判斷是否其中某個數據滿足條件, 然后發射一個單獨的boolena值: `contains`
|_ 判斷是否Observable沒有發射任何數據, 然后發射一個單獨的boolean值: `isEmpty`
|_ 判斷是否該Observable的數據序列和另一個Observable的數據序列一樣, 然后發射一個單獨的boolean值: `sequenceEqual`
|_ 發射所有數據的平均值: `average`
|_ 發射所有數據的總和: `sum`
|_ 發射數據的個數: `count`
|_ 發射數據序列中的最大值: `max`
|_ 發射數據序列中的最小值: `min`
|_ 通過對每個數據應用一個方法, 并發射該方法的結果: `scan`
我想將Observable發射的整個數據序列轉換為另一種數據結構: `to`
我想控制操作符進行操作所在的線程: `subscribeOn`
|_ 想控制通知Observer的線程: `observeOn`
我想創建一個在某種事件發生后, 可以激活某個特定的action的Observable: `do`
我想創建一個可以通知Observer發生錯誤的Observable: `throw`
|_ 如果在某段時間內沒有發射任何數據, 則通知錯誤: `timeout`
我想創建一個可以從錯誤中恢復的Observable
|_ 它可以通過轉換到一個備份Observable來從超時中恢復: `timeout`
|_ 它可以從上游錯誤通知中恢復: `catch`
|_ 通過嘗試重新subscribe上游的Observable: `retry`
我想創建一個與Observable有相同生命周期的對象: `using`
我想subscribe一個Observable, 并一直阻塞, 直到該Observable完成時, 接收一個Future: `start`
我想創建一個Observable, 在subscribe時并不發射數據, 直到我要求它才發射: `publish`
|_ 它只發射最后一個數據: `publishLast`
|_ 它發射全部數據序列, 無論在序列發射后是否有其他進行subscribe: `replay`
|_ 當所有subscriber都取消subscribe時, 我要放棄發射: `refCount`
|_ 我要要求它開始發射: `connect`

Scheduler線程切換

如果你想在操作符中引入多線程, 你可以使用 Schedulers

默認情況下, Observable和操作鏈會在調用 subscribe 方法的線程中進行操作和發出通知. subscribeOn 操作符可以指定操作Observable的具體線程. observeOn 操作符可以指定Observable向Observer發出通知的線程

  • observeOn
    • 指定Observable向Observer發出通知的線程
    • 可以在操作鏈多次調用
    • 作用范圍: 從本次observeOn調用開始, 到下次observeOn操作符結束.
  • subscribeOn
    • 指定操作Observable的具體線程
    • 可以在操作鏈中多次調用
    • 作用范圍: 從 創建操作符 或 doOnSubscribe操作符 開始, 到下次subscribeOn操作符結束

注意: subscribe中的通知回調方法是 observeOn 指定的線程, 而不是subscribeOn指定的線程

 

來自:http://blog.lixplor.com/2016/10/16/rxjava/

 

 本文由用戶 qiyue 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!