RxJava 教程第四部分:并發 之測試

RenKim 8年前發布 | 9K 次閱讀 RxJava 并發 Java開發

在開發軟件的時候,我們需要確保代碼正確執行。為了快速的獲取每次修改后的反饋,通常開發人員使用自定義測試。

在同步的 Rx 中測試和普通 Java 中的單元測試沒有太大的區別。如果要測試異步代碼,可能會有點需要注意的地方,比如要測試下面的代碼:

Observable.interval(1, TimeUnit.SECONDS)
    .take(5)
 

上面的 Observable 發射一個數據流,需要 5秒 來發射完所有的數據。如果我們使用自動化測試這個代碼,則是不是意味著測試代碼也要執行 5秒,如果我們有成千上萬個這樣的測試,測試將消耗很多時間去完成。

TestScheduler

上面示例的代碼,5秒鐘的時間其實大部分都在等待。如果我們可以加快系統時鐘,則可以很快的完成數據流的發射。雖然實際操作中,無法加速系統時鐘,但是可以加速一個虛擬的時鐘。在 Rx 設計中,考慮到只在 scheduler 中使用時間相關的操作。這樣可以用一個虛擬的 TestScheduler 來替代真實的 Scheduler。

TestScheduler 和前面一節介紹的線程調度功能是一樣的。調度的任務要么立刻執行,要么在將來某個時刻執行。區別在于 TestScheduler 中的時間是不動的,只有被調用了時間才會繼續。

advanceTimeTo

advanceTimeTo 函數就是把 TestScheduler 中的時鐘前進指定的時間刻度。

TestScheduler s = Schedulers.test();
 
s.createWorker().schedule(
        () -> System.out.println("Immediate"));
s.createWorker().schedule(
        () -> System.out.println("20s"),
        20, TimeUnit.SECONDS);
s.createWorker().schedule(
        () -> System.out.println("40s"),
        40, TimeUnit.SECONDS);
 
System.out.println("Advancing to 1ms");
s.advanceTimeTo(1, TimeUnit.MILLISECONDS);
System.out.println("Virtual time: " + s.now());
 
System.out.println("Advancing to 10s");
s.advanceTimeTo(10, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());
 
System.out.println("Advancing to 40s");
s.advanceTimeTo(40, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());
 

結果:

Advancingto 1ms
Immediate
Virtualtime: 1
Advancingto 10s
Virtualtime: 10000
Advancingto 40s
20s
40s
Virtualtime: 40000
 

上面示例中創建的 3 個任務,第一個任務立刻執行,第二個和第三個在將來執行。可以看到如果不調用 advanceTimeTo 來使時間前進,則所有任務都不會執行,因為在 TestScheduler 中時間是停止的。當時間前進的時候, TestScheduler 會同步執行所有滿足時間條件的任務。

advanceTimeTo 可以設置時間為任意時刻。也可以回到過去(設置的時間比當前的時間還早)。所以這個函數如果不注意使用,可能會導致不可預見的 bug。一般建議使用下面這個函數。

advanceTimeBy

advanceTimeBy 顧名思義,在當前時間基礎上前進多少。

TestScheduler s = Schedulers.test();
 
s.createWorker().schedule(
        () -> System.out.println("Immediate"));
s.createWorker().schedule(
        () -> System.out.println("20s"),
        20, TimeUnit.SECONDS);
s.createWorker().schedule(
        () -> System.out.println("40s"),
        40, TimeUnit.SECONDS);
 
System.out.println("Advancing by 1ms");
s.advanceTimeBy(1, TimeUnit.MILLISECONDS);
System.out.println("Virtual time: " + s.now());
 
System.out.println("Advancing by 10s");
s.advanceTimeBy(10, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());
 
System.out.println("Advancing by 40s");
s.advanceTimeBy(40, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());
 

結果:

Advancingby 1ms
Immediate
Virtualtime: 1
Advancingby 10s
Virtualtime: 10001
Advancingby 40s
20s
40s
Virtualtime: 50001
 

triggerActions

triggerActions 不會修改時間。只是用來執行當前可以調度的任務。

TestScheduler s = Schedulers.test();
 
s.createWorker().schedule(
        () -> System.out.println("Immediate"));
s.createWorker().schedule(
        () -> System.out.println("20s"),
        20, TimeUnit.SECONDS);
 
s.triggerActions();
System.out.println("Virtual time: " + s.now());
 

結果:

Immediate
Virtualtime: 0
 

調度沖突

有些任務可能在同一時刻執行。如果發生這種情況,則被稱之為 調度沖突。 這些任務調度的順序就是他們執行的順序(也就是按照順序執行)

TestScheduler s = Schedulers.test();
 
s.createWorker().schedule(
        () -> System.out.println("First"),
        20, TimeUnit.SECONDS);
s.createWorker().schedule(
        () -> System.out.println("Second"),
        20, TimeUnit.SECONDS);
s.createWorker().schedule(
        () -> System.out.println("Third"),
        20, TimeUnit.SECONDS);
 
s.advanceTimeTo(20, TimeUnit.SECONDS);
 

結果:

First
Second
Third
 

測試

Rx 的 Observable的 大部分操作函數都有一個可以指定 Scheduler 的重載形式。在這些函數上同樣可以使用 TestScheduler。

@Test
public void test() {
    TestSchedulerscheduler = new TestScheduler();
    List<Long> expected = Arrays.asList(0L, 1L, 2L, 3L, 4L);
    List<Long> result = new ArrayList<>();
    Observable
        .interval(1, TimeUnit.SECONDS, scheduler)
        .take(5)
        .subscribe(i -> result.add(i));
    assertTrue(result.isEmpty());
    scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
    assertTrue(result.equals(expected));
}
 

這樣測試代碼就可以很開的完成,比較適合測試簡短的 Rx 代碼。在實際代碼中,可以把獲取 Scheduler 的函數封裝起來,在 debug 版本中使用 TestScheduler ,而在發布版本中使用真實的 Scheduler。

TestSubscriber

上面的測試中,我們手工的收集發射的數據并根據期望的數據去對比,來判斷測試是否成功。由于這樣的測試很常見,Rx 提供了一個 TestSubscriber 來幫助簡化測試過程。 前面的測試代碼使用 TestSubscriber 可以變為這樣:

@Test
public void test() {
    TestSchedulerscheduler = new TestScheduler();
    TestSubscriber<Long> subscriber = new TestSubscriber<>();
    List<Long> expected = Arrays.asList(0L, 1L, 2L, 3L, 4L);
    Observable
        .interval(1, TimeUnit.SECONDS, scheduler)
        .take(5)
        .subscribe(subscriber);
    assertTrue(subscriber.getOnNextEvents().isEmpty());
    scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
    subscriber.assertReceivedOnNext(expected);
}
 

TestSubscriber 不僅僅只收集數據,還有如下一些函數:

java.lang.ThreadgetLastSeenThread()
java.util.List<Notification<T>> getOnCompletedEvents()
java.util.List<java.lang.Throwable> getOnErrorEvents()
java.util.List<T> getOnNextEvents()
 

有兩點需要額外注意:一、getLastSeenThread 函數。 TestSubscriber 會檢查在那個線程執行回調函數,并記錄最后一個線程。如果你想測試回調函數是否發生在 GUI 線程,則可以使用這個函數。二、有趣的是 getOnCompletedEvents 可以返回多個結束事件。這是違反 Rx 約定的情況,可以通過測試來檢查。

TestSubscriber 還提供了一些常見的判斷函數:

void assertNoErrors()
void assertReceivedOnNext(java.util.List<T> items)
void assertTerminalEvent()
void assertUnsubscribed()
 

另外還可以阻塞直到特定的事件發生:

void awaitTerminalEvent()
void awaitTerminalEvent(long timeout, java.util.concurrent.TimeUnitunit)
void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, java.util.concurrent.TimeUnitunit)
 

指定時間可能會導致超時的異常(沒有在規定的時間內完成)。

來自: http://blog.chengyunfeng.com/?p=979

 本文由用戶 RenKim 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!