使用 UT 高效地玩轉 RxJava 的操作符

MayLovely 8年前發布 | 11K 次閱讀 RxJava

RxJava 博大精深,想要入門和進階,操作符是一個切入點。 所以,我們希望尋找一種可以把操作符寫得比較爽,同時可以快速驗證輸入輸出是否準確的玩法。思路有以下兩點:

  1. 使用 UT 來對每一個操作符進行實現,如此一來可以脫離 Android 平臺的依賴,專注于操作符本身。
  2. 對于每一種操作符,使用 RX Marbles ,或者 RxJava 官方 的彈珠圖(marble diagrams)進行實現。

比如下面兩張圖,分別來自 RX Marbles 和官方的彈珠圖,我們要做的就是用 UT 有目的性、精確地 實現這兩張圖的輸入和輸出。

所謂有目的性、精確地輸入輸出,意思就是根據所有操作符的彈珠圖的每條數據流,以及操作符的含義,嚴格按照圖片表達的意思進行代碼的實現。通過這種方強迫癥一般的方式,對理解操作符和 RxJava 的體系有很大的幫助。

(一)預備知識

我們希望把精力專注于操作符的實現,而不是 UT 的技巧,但由于 RxJava 的異步特性,有很多操作符是跟線程相關的,因此我們要先掌握 UT 中如何對線程進行處理的預備知識。

讓測試線程最晚結束

在線程相關的測試代碼中,有個很棘手的現象是:測試線程早于子線程執行完畢,如下代碼:

@Test
public void test_thread_early() {

    //測試線程啟動
    System.out.println("測試線程-start");

    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("子線程-start");
            OperatorUtils.sleep(3000);
            System.out.println("子線程-end");
        }
    }).start();

    //測試線程結束后,子線程還未執行完畢,因此子線程無法完整的輸出測試結果
    System.out.println("測試線程-end");
}

在上述代碼中,測試線程瞬間就執行完畢了,而子線程需要執行3s,測試線程早于子線程執行完畢,因此子線程將無法完整的執行,因此,輸出的結果是:

測試線程-start
測試線程-end
子線程-start

于此對應的,我們來看看 RxJava 操作符的例子,通過 timer 操作符實現延遲3s發送數據:

@Test
public void test_thread_early_observable() {
    System.out.println("測試線程-start,所在線程:" + Thread.currentThread().getName());

    //消息源在Schedulers.computation()線程中執行,3s后執行,此時測試線程已經執行完畢,無法正常輸出結果
    Observable.timer(3, TimeUnit.SECONDS)
            .subscribe(num -> {
                System.out.println("Observable和Subscriber所在線程:" + Thread.currentThread().getName());
                System.out.println("獲取訂閱數據:" + num);

            });
    System.out.println("測試線程--end");
}

與上面的代碼一樣,由于測試線程早早的結束了, timer 操作符所在的線程 Schedulers.computation() 將無法完整地執行完畢,因此輸出的結果是:

測試線程-start,所在線程:main
測試線程-end

如果無法保證所有線程都執行完畢,便無法得到預期的輸出結果。那么,如何解決這個問題?有種最笨的方法便是讓測試線程成為最晚結束的線程,我們為測試線程增加類似于 Thread.sleep(4000) 的邏輯,便可保證以上兩份代碼可以在正常輸出。(此文不希望涉及太多的測試技巧,如果需要更嚴謹和更強大的線程異步測試,可以參考些第三方框架,如 awaitility

使用TestScheduler操縱時間

除了這種笨方法之外,RxJava 提供了 TestScheduler ,通過這個調度器可以實現對時間的操縱。

對于上文提到的 timer 操作符,通過 testScheduler.advanceTimeBy(3, TimeUnit.SECONDS) 可以將時間提前3s,此時測試線程和 timer 操作符所在的線程均可順利的執行完畢,完整代碼如下:

@Test
public void test_thread_with_TestScheduler() {

    TestScheduler testScheduler = Schedulers.test();
    System.out.println("測試線程:" + Thread.currentThread().getName());

    //指定調度器
    Observable.timer(3, TimeUnit.SECONDS, testScheduler)
            .subscribe(num -> {
                System.out.println("Observable和Subscriber線程:" + Thread.currentThread().getName());
                System.out.println("獲取訂閱數據:" + num);
            });

    //將時間提前了3s
    testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
}

聚合操作符的線程處理

很多聚合操作符,如 merge 、 zip 等,需要在多個不同的線程中構造不同的數據流,從而體現數據流發送的先后關系,以及所對應的不同的輸出結果。如何讓多個線程完整的執行完畢?結合上文所講的 讓測試線程最晚結束 以及 使用 TestScheduler 便可做到。筆者在下文的聚合操作符一節中將會具體講解。

有了這些 預備知識 ,基本上可以實現 RxJava 的所有操作符,接下來針對不同類型的操作符分別舉例一二進行講解。

(二)不同類型的操作符實現

interval

interval 作為創建型的操作符,具備間隔一段時間發送數據的能力,是我們寫其他操作符的基礎,因此先來講解下 interval 。

這張圖要表達的意思很簡單,自頂而下的分析如下:

  1. 操作符:由于 interval 操作符是創建型的,因此直接調用操作符來產生數據流,根據 api 參數,需定義其間隔時長,這個數值我們設置為100ms。
  2. 輸入:執行了 Observable.interval() 之后,每間隔指定時間將輸出 0、1、2、3…… 的無窮數據(注:通過彈珠圖,可以看到第一個數據也是有間隔時間的)。
  3. 輸出:即數據消費者,在 RxJava 中體現為 Subscriber 。這張圖里并沒有畫出輸出的數據流,為了觀察輸出,我們自定義訂閱者。
  4. 實現思路: interval 默認在 Schedulers.computation() 線程中執行,執行的時間將會超過測試線程,根據上文的「預備知識」這一節所述,我們使用 TestScheduler 來操縱時間,比如,為了輸出4個數據, interval 需要4個單位的間隔時間(400ms),將時間提前400ms可輸出我們想要的結果。具體實現如下:

    @Test
    public void interval() {
     Observable.interval(100, TimeUnit.MILLISECONDS, mTestScheduler)
             .subscribe(mList::add);
    
     //時間提早400ms前
     mTestScheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS);
     assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L));
    
     //時間提早(400 + 200)ms前
     mTestScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
     assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L));
    }

以此類推, range 、 just 、 repeat 等創建型的操作符均可按照這種方式實現彈珠圖,這類操作符的實現代碼請查看: CreatingOperatorsTest.java

delay

delay 是工具類型的操作符,可以對數據流進行延時發送。

與創建型操作符 interval 彈珠圖不一樣, delay 有輸入和輸出兩條數據流,中間是操作符的轉換過程。輸入需借助創建型操作符實現(如 just ),輸出則由訂閱者完成。

  1. 輸入:使用比較簡單的 just 操作符,即 Observable.just(1, 2, 1) 。
  2. 輸出:經過 delay 的變換,在延遲指定的時間之后,輸出與輸入一致的輸入流。
  3. 實現思路:此操作符也是與時間相關的操作符,通用 TestScheduler 來操縱時間,并且驗證「延時時間內」和「超過延時時間」是否有數據流輸出。代碼如下:

    @Test
    public void delay() {
     Observable.just(1, 2, 1)
             .delay(3000, TimeUnit.SECONDS, mTestScheduler)
             .subscribe(mList::add);
    
     mTestScheduler.advanceTimeBy(2000, TimeUnit.SECONDS);
     System.out.println("after 2000ms,result = " + mList);
     assertTrue(mList.isEmpty());
    
     mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
     System.out.println("after 3000ms,result = " + mList);
     assertEquals(mList, Arrays.asList(1, 2, 1));
    }

工具型的操作符還有非常多,比如變換線程的 observeOn 和 subscribeOn ,比如 Observable 生命周期的事件監聽操作符 doOnSubscribe 、 doOnNext 、 doOnCompleted ,延遲訂閱的 delaySubscription 等。

amb

amb 是條件型的操作符(Conditional Operators),滿足一定的條件數據流才會開始發送,而 amb 需要滿足的條件便是:多個數據流中最早產生數據的數據流進行發送,彈珠圖也明確地表達出了這層含義。

使用 UT 高效地玩轉 RxJava 的操作符

  1. 輸入:這里有3條數據流,開始發送數據的時間各不一樣,通過之前的操作符講解,這里使用 just + delay 即可實現。
  2. 輸出:經過 amb 變化后,輸出了最早發送數據的數據流,即第二條數據流。
  3. 實現思路:通過 delay 操作符分別延時500s、200s和1000s,然后通用 TestScheduler 將時間提早1000s,訂閱數據流后,驗證下輸出。代碼如下:

    @Test
    public void amb() {
     Observable<Integer> o1 = Observable.just(20, 40, 60)
             .delay(500, TimeUnit.SECONDS, mTestScheduler);
    
     Observable<Integer> o2 = Observable.just(1, 2, 3)
             .delay(200, TimeUnit.SECONDS, mTestScheduler);
    
     Observable<Integer> o3 = Observable.just(0, 0, 0)
             .delay(1000, TimeUnit.SECONDS, mTestScheduler);
    
     Observable.amb(o1, o2, o3)
             .subscribe(mList::add);
    
     mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
     assertEquals(mList, Arrays.asList(1, 2, 3));
    }

以此類推,更多條件行的操作符,如 skipUntil 、 takeUntil 等,請查看 ConditionalAndBooleanOperatorsTest.java

buffer

buffer 是轉換型的操作符,他可以將單個數據緩存起來,批量發送,發送的數據類型是 List 。

buffer

上圖要表達的意思很明確,發送6個數據,每三個做一次緩存,然后批量發送,代碼實現如下:

@Test
public void buffer() {

    Observable.just(1, 2, 3, 4, 5, 6)
            .buffer(3)
            .subscribe(mList::add);

    System.out.println(mList);
    List<List<Integer>> exceptList = Arrays.asList(Arrays.asList(1, 2, 3),
            Arrays.asList(4, 5, 6));
    assertEquals(mList, exceptList);
}

flatMap 和 concatMap

接下來,來對比一組轉換型的操作符: flatMap 和 concatMap ,這兩者充分體現了 marble diagrams 給我們帶來的各種有價值的信息。以下是這兩個操作符的 marble diagrams:

flatMap

concatMap

  1. 輸入:兩者完全一模一樣的輸入,這里要重點關注彈珠的顏色,顏色代表了數據流的順序。
  2. 輸出:輸入的數據流經過變換后,每份數據都變成了兩份,此外, flatMap 變換后,綠色的◇和藍色◇是交叉的,而 concatMap 則保持了與輸入一致的順序 ,這個細節決定了我們如何來實現這兩張圖。
  3. 實現思路:在 flatMap 或 concatMap 之后,3個數據變成了6個數據,假設輸入為 1、2、3 ,則輸出為 1、1、2、2、3、3 ,我們要想辦法讓變換后的輸出有時間差,即按照 1、1、2、3、2、3 的順序輸出,思考再三, interval 可以實現這個場景,將原始的輸入流1、2、3分別作為 interval 時間間隔的變量,來模擬交叉的輸出。具體實現如下:
@Test
public void flatMap() {
    Observable.just(1, 2, 3)
            .flatMap((Func1<Integer, Observable<?>>) num -> Observable.interval(num - 1,
                    TimeUnit.SECONDS, mTestScheduler)
                    .take(2)
                    .map(value -> num + "◇"))
            .subscribe(mList::add);

    mTestScheduler.advanceTimeBy(100, TimeUnit.SECONDS);
    assertEquals(mList, Arrays.asList("1◇", "1◇", "2◇", "3◇", "2◇", "3◇"));
    System.out.println(mList);
}

上述代碼中,只需把 flatMap 修改為 concatMap ,便可獲得 "1◇", "1◇", "2◇", "2◇", "3◇", "3◇" 的數據流,與彈珠圖所要表達的意思完全一致。通過這個例子,我們可以感受到,彈珠圖包含了操作符的諸多細節,嚴謹地實現彈珠圖的輸入輸出,可以更深入的了解操作符。

debounce

debounce 是過濾型的操作符,所以會按一定的規則過濾數據流。這個規則是:Observable每產生一個結果后,如果在規定的間隔時間內沒有別的結果產生,則把這個結果提交給訂閱者處理,否則忽略該結果。

  1. 輸入:對于輸入的數據流可以這樣定義:先產生 1 的數據,間隔500ms后產生 2、3、4、5 ,再間隔500ms,產生 6 ,使用 create 操作符結合 Thread.sleep() 來實現輸入。
  2. 輸出: debounce 的間隔時間設置為400ms,在三段間隔周期內,將依次輸出 1、5、6 。具體代碼如下:
@Test
public void debounce() {

    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            OperatorUtils.sleep(500);

            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onNext(4);
            subscriber.onNext(5);

            OperatorUtils.sleep(500);
            subscriber.onNext(6);
            subscriber.onCompleted();
        }
    })
            .subscribeOn(mTestScheduler)
            .doOnNext(System.out::println)
            .debounce(400, TimeUnit.MILLISECONDS)
            .subscribe(mList::add);

    // 測試線程將時間提早10ms,可以保證create操作符順利執行完畢
    mTestScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    System.out.println(mList);
    assertEquals(mList, Arrays.asList(1, 5, 6));
}

以此類推,按照這種方式可以實現 sample 、 throttleFirst 、 throttleLast 等過濾型的操作符,具體代碼請查看: FilteringOperatorsTest.java

merge

merge 是聚合型的操作符。既然是聚合,因此需要2條以上的數據流,聚合之后,輸出一條全新的數據流。

  1. 輸入:兩條數據流,并且要重點關注下數據發送的順序。
  2. 輸出:根據輸入的數據順序,原封不動的合并之后,進行輸出。
  3. 實現思路:兩條數據流均使用 interval 創建,第一條的間隔時間定義為5s,第二條數據流在第一條數據流產生了三個數據之后才發出第一個數據,因此時間間隔設置為18s,具體實現如下:
@Test
public void merge() {
    Observable<Long> observable1 = Observable.interval(5, TimeUnit.SECONDS, mTestScheduler)
            .take(5)
            .map(aLong -> (aLong + 1) * 20)
            .doOnNext(System.out::println);

    Observable<Long> observable2 = Observable.interval(18, TimeUnit.SECONDS, mTestScheduler)
            .take(2)
            .map(aLong -> 1L)
            .doOnNext(System.out::println);

    Observable.merge(observable1, observable2).subscribe(mList::add);

    mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
    assertEquals(mList, Arrays.asList(20L, 40L, 60L, 1L, 80L, 100L, 1L));
}

combineLatest

combineLatest 是聚合型的操作符, 其聚合的規則是:每條數據流中的每個數據都要與另外一條數據流已發送的最近的數據進行兩兩結合。

構造出如彈珠圖所示的兩條數據流,重點在于制造時間差和多線程:

  • 使用 create + Thread.sleep() 來制造數據流產生的時間差。
  • 讓兩條數據流在不同的線程中發送數據,使用 subscribeOn 操作符可以實現線程的調度。

  • 首先是第一條數據流的構造,讓其在 TestScheduler.test() 線程中產生數據(其實便是測試線程,增加了操縱時間的能力),代碼如下:

    Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
    
     @Override
     public void call(Subscriber<? super Integer> subscriber) {
         OperatorUtils.logThread("observable1");
         subscriber.onNext(1);
         OperatorUtils.sleep(500);
         subscriber.onNext(2);
         OperatorUtils.sleep(1500);
         subscriber.onNext(3);
         OperatorUtils.sleep(250);
         subscriber.onNext(4);
         OperatorUtils.sleep(500);
         subscriber.onNext(5);
         subscriber.onCompleted();
     }
    }).subscribeOn(mTestScheduler).doOnNext(System.out::println);
  • 其次是第二條數據流,將其生產數據的線程定義為 Schedulers.newThread() ,代碼如下:
    Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
     @Override
     public void call(Subscriber<? super String> subscriber) {
         OperatorUtils.logThread("observable2");
         OperatorUtils.sleep(250);
         subscriber.onNext("A");
         OperatorUtils.sleep(300);
         subscriber.onNext("B");
         OperatorUtils.sleep(500);
         subscriber.onNext("C");
         OperatorUtils.sleep(100);
         subscriber.onNext("D");
         subscriber.onCompleted();
     }
    }).subscribeOn(Schedulers.newThread()).doOnNext(System.out::println);
  • 前面2點完成了輸入,接下來就是進行聚合變換,以及消費數據,產生輸出,并驗證與彈珠圖的輸出一致。
    (Func2<Integer, String, Object>) (integer, s) -> integer + s).subscribe(mList::add);
    //測試線程提前一定時間,讓observable1能順利開始發送數據
    mTestScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    System.out.println(mList);
    assertEquals(mList, Arrays.asList("1A", "2A", "2B", "2C", "2D", "3D", "4D", "5D"));

與 merge 和 combineLatest 類似,我們可以依次實現 zip 、 switchOnNext 、 withLatestFrom 等聚合型操作符,并了解他們之間的區別。聚合型的操作符所有代碼請前往: CombiningOperatorsTest.java

connect

之前介紹的創建型操作符均創建了 cold 類型的 Observable ,其特點是只有訂閱者訂閱數據時,數據流才會開始發送數據。于此相反,hot 類型的 Observable 不管有沒有訂閱者,都可以直接開始發送數據。 publish 和 connect 是與 hot Observable 相關的一類操作符。

使用 UT 高效地玩轉 RxJava 的操作符

這張彈珠圖并不好理解,但如果能完整實現,對 hot Observable 的便能了然于胸。這張圖中,輸出有三條數據流,代表有三個訂閱者,但是訂閱的時間不一致,最終接收到的數據也不一致,此外,這張圖中,體現了 publish 和 connect 兩種操作符。

  1. 輸入:數據流的產生比較清晰,用上文講過的創建型操作符即可實現。由于需要時間差,因此采用 interval 來產生數據流,時間間隔定義為3s。此外, interval 產生的數據流是 cold 類型的,如何由 cold 變成 hot,其實這便是 publish 操作符要做的事情。
  2. 輸出:輸出的信息量比較大,我們需要好好捋一捋:
    • 首先可以明確有三個訂閱者,且訂閱的時間各不一樣。延時訂閱可以使用 delaySubscription 操作符。
    • 第一個訂閱者即刻訂閱,不延時,而他在訂閱時,數據流還未開始發送數據,因此可以訂閱到完整的數據流。
    • 第一個訂閱者的數據流中有個操作符不可忽視—— connect ,他決定著 Observable 何時開始發送數據。根據圖中所示,將時間定義為2秒后。
    • 第二個訂閱者在數據發送了2個之后才開始訂閱,因此將訂閱時間設置為延遲6秒訂閱。他將只能訂閱到最后一個數據。
    • 第三個訂閱者與第一個區別并不大,我們將他定義為延時1秒后訂閱。

完整的代碼實現如下:

public void connect() {

    List<Integer> list1 = new ArrayList<>();
    List<Integer> list2 = new ArrayList<>();
    List<Integer> list3 = new ArrayList<>();

    //構造1,2,3的數據流,每隔3s發射數據
    ConnectableObservable<Integer> connectableObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            OperatorUtils.sleep(3000);
            subscriber.onNext(2);
            OperatorUtils.sleep(3000);
            subscriber.onNext(3);
        }
    }).publish();

    System.out.println("Subscriber1-0s后開始訂閱數據");
    //立刻訂閱完整的數據流
    connectableObservable.doOnNext(num -> System.out.println("Subscriber1-->" + num))
            .subscribe(list1::add);

    //延遲6s后再訂閱,將只訂閱到3的數據流
    connectableObservable.delaySubscription(6, TimeUnit.SECONDS, Schedulers.newThread())
            .doOnSubscribe(()->{
                System.out.println("Subscriber2-6s后開始訂閱數據");
            })
            .doOnNext(num -> System.out.println("Subscriber2-->" + num))
            .subscribe(list2::add);

    //延遲1s后再訂閱,將只訂閱到3的數據流
    connectableObservable.delaySubscription(1, TimeUnit.SECONDS, Schedulers.newThread())
            .doOnSubscribe(()->{
                System.out.println("Subscriber3-1s后開始訂閱數據");
            })
            .doOnNext(num -> System.out.println("Subscriber3-->" + num))
            .subscribe(list3::add);


    //延時2s執行connect()
    OperatorUtils.sleep(2000);
    System.out.println("Observable 2s后觸發connect()");
    connectableObservable.connect();

    assertEquals(list1, Arrays.asList(1, 2, 3));
    assertEquals(list2, Collections.singletonList(3));
    assertEquals(list3, Arrays.asList(1, 2, 3));
}

以此類推,可以實現其他與 hot Observable 相關的操作符,如 refCount、replay、cache 等。

其他類型的操作符

除了上文介紹的7種不同類型的操作符之外,還有錯誤處理類型(如 retry 、 retryWhen )、背壓類型(如 onBackpressureBuffer )、Convert 類型(如 toList 、 toMap )的操作符未涉及到,以及一些彈珠圖無法完全詮釋操作符本身的諸多細節的講解,篇幅所限,請移步這篇文章查看。

(三)本文代碼

目前已經實現了的彈珠圖(marble diagrams)的操作符種類如下:

(四)結束語

授人以魚不如授人以漁。本文側重介紹一種學習 RxJava 、全面且深入了解操作符的方式,總結起來有如下關鍵點:

  1. 使用 UT 實現,消除對 Android 的依賴,且不要涉及太多的測試技巧,專注于操作符的實現。
  2. 有目的性且嚴謹地實現輸入輸出。每個操作符,讀懂 marble diagrams ,并通過代碼實現。
  3. marble diagrams 圖片來自于 RX Marbles ,或者 RxJava 官方 。
  4. 一些有更深層次含義或細節的,marble diagrams 無法完整詮釋的,如 defer , retryWhen ,查閱更多的文章實現。

參考文章

 

來自:http://www.jianshu.com/p/5b774424b393

 

 本文由用戶 MayLovely 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!