RxJava 教程第四部分:并發 之測試
在開發軟件的時候,我們需要確保代碼正確執行。為了快速的獲取每次修改后的反饋,通常開發人員使用自定義測試。
在同步的 Rx 中測試和普通 Java 中的單元測試沒有太大的區別。如果要測試異步代碼,可能會有點需要注意的地方,比如要測試下面的代碼:
Observable.interval(1, TimeUnit.SECONDS)
.take(5)
上面的 Observable 發射一個數據流,需要 5秒 來發射完所有的數據。如果我們使用自動化測試這個代碼,則是不是意味著測試代碼也要執行 5秒,如果我們有成千上萬個這樣的測試,測試將消耗很多時間去完成。
TestScheduler
上面示例的代碼,5秒鐘的時間其實大部分都在等待。如果我們可以加快系統時鐘,則可以很快的完成數據流的發射。雖然實際操作中,無法加速系統時鐘,但是可以加速一個虛擬的時鐘。在 Rx 設計中,考慮到只在 scheduler 中使用時間相關的操作。這樣可以用一個虛擬的 TestScheduler 來替代真實的 Scheduler。
TestScheduler 和前面一節介紹的線程調度功能是一樣的。調度的任務要么立刻執行,要么在將來某個時刻執行。區別在于 TestScheduler 中的時間是不動的,只有被調用了時間才會繼續。
advanceTimeTo
advanceTimeTo 函數就是把 TestScheduler 中的時鐘前進指定的時間刻度。
TestScheduler s = Schedulers.test();
s.createWorker().schedule(
() -> System.out.println("Immediate"));
s.createWorker().schedule(
() -> System.out.println("20s"),
20, TimeUnit.SECONDS);
s.createWorker().schedule(
() -> System.out.println("40s"),
40, TimeUnit.SECONDS);
System.out.println("Advancing to 1ms");
s.advanceTimeTo(1, TimeUnit.MILLISECONDS);
System.out.println("Virtual time: " + s.now());
System.out.println("Advancing to 10s");
s.advanceTimeTo(10, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());
System.out.println("Advancing to 40s");
s.advanceTimeTo(40, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());
結果:
Advancingto 1ms
Immediate
Virtualtime: 1
Advancingto 10s
Virtualtime: 10000
Advancingto 40s
20s
40s
Virtualtime: 40000
上面示例中創建的 3 個任務,第一個任務立刻執行,第二個和第三個在將來執行。可以看到如果不調用 advanceTimeTo 來使時間前進,則所有任務都不會執行,因為在 TestScheduler 中時間是停止的。當時間前進的時候, TestScheduler 會同步執行所有滿足時間條件的任務。
advanceTimeTo 可以設置時間為任意時刻。也可以回到過去(設置的時間比當前的時間還早)。所以這個函數如果不注意使用,可能會導致不可預見的 bug。一般建議使用下面這個函數。
advanceTimeBy
advanceTimeBy 顧名思義,在當前時間基礎上前進多少。
TestScheduler s = Schedulers.test();
s.createWorker().schedule(
() -> System.out.println("Immediate"));
s.createWorker().schedule(
() -> System.out.println("20s"),
20, TimeUnit.SECONDS);
s.createWorker().schedule(
() -> System.out.println("40s"),
40, TimeUnit.SECONDS);
System.out.println("Advancing by 1ms");
s.advanceTimeBy(1, TimeUnit.MILLISECONDS);
System.out.println("Virtual time: " + s.now());
System.out.println("Advancing by 10s");
s.advanceTimeBy(10, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());
System.out.println("Advancing by 40s");
s.advanceTimeBy(40, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());
結果:
Advancingby 1ms
Immediate
Virtualtime: 1
Advancingby 10s
Virtualtime: 10001
Advancingby 40s
20s
40s
Virtualtime: 50001
triggerActions
triggerActions 不會修改時間。只是用來執行當前可以調度的任務。
TestScheduler s = Schedulers.test();
s.createWorker().schedule(
() -> System.out.println("Immediate"));
s.createWorker().schedule(
() -> System.out.println("20s"),
20, TimeUnit.SECONDS);
s.triggerActions();
System.out.println("Virtual time: " + s.now());
結果:
Immediate
Virtualtime: 0
調度沖突
有些任務可能在同一時刻執行。如果發生這種情況,則被稱之為 調度沖突。 這些任務調度的順序就是他們執行的順序(也就是按照順序執行)
TestScheduler s = Schedulers.test();
s.createWorker().schedule(
() -> System.out.println("First"),
20, TimeUnit.SECONDS);
s.createWorker().schedule(
() -> System.out.println("Second"),
20, TimeUnit.SECONDS);
s.createWorker().schedule(
() -> System.out.println("Third"),
20, TimeUnit.SECONDS);
s.advanceTimeTo(20, TimeUnit.SECONDS);
結果:
First
Second
Third
測試
Rx 的 Observable的 大部分操作函數都有一個可以指定 Scheduler 的重載形式。在這些函數上同樣可以使用 TestScheduler。
@Test
public void test() {
TestSchedulerscheduler = new TestScheduler();
List<Long> expected = Arrays.asList(0L, 1L, 2L, 3L, 4L);
List<Long> result = new ArrayList<>();
Observable
.interval(1, TimeUnit.SECONDS, scheduler)
.take(5)
.subscribe(i -> result.add(i));
assertTrue(result.isEmpty());
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
assertTrue(result.equals(expected));
}
這樣測試代碼就可以很開的完成,比較適合測試簡短的 Rx 代碼。在實際代碼中,可以把獲取 Scheduler 的函數封裝起來,在 debug 版本中使用 TestScheduler ,而在發布版本中使用真實的 Scheduler。
TestSubscriber
上面的測試中,我們手工的收集發射的數據并根據期望的數據去對比,來判斷測試是否成功。由于這樣的測試很常見,Rx 提供了一個 TestSubscriber 來幫助簡化測試過程。 前面的測試代碼使用 TestSubscriber 可以變為這樣:
@Test
public void test() {
TestSchedulerscheduler = new TestScheduler();
TestSubscriber<Long> subscriber = new TestSubscriber<>();
List<Long> expected = Arrays.asList(0L, 1L, 2L, 3L, 4L);
Observable
.interval(1, TimeUnit.SECONDS, scheduler)
.take(5)
.subscribe(subscriber);
assertTrue(subscriber.getOnNextEvents().isEmpty());
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
subscriber.assertReceivedOnNext(expected);
}
TestSubscriber 不僅僅只收集數據,還有如下一些函數:
java.lang.ThreadgetLastSeenThread()
java.util.List<Notification<T>> getOnCompletedEvents()
java.util.List<java.lang.Throwable> getOnErrorEvents()
java.util.List<T> getOnNextEvents()
有兩點需要額外注意:一、getLastSeenThread 函數。 TestSubscriber 會檢查在那個線程執行回調函數,并記錄最后一個線程。如果你想測試回調函數是否發生在 GUI 線程,則可以使用這個函數。二、有趣的是 getOnCompletedEvents 可以返回多個結束事件。這是違反 Rx 約定的情況,可以通過測試來檢查。
TestSubscriber 還提供了一些常見的判斷函數:
void assertNoErrors()
void assertReceivedOnNext(java.util.List<T> items)
void assertTerminalEvent()
void assertUnsubscribed()
另外還可以阻塞直到特定的事件發生:
void awaitTerminalEvent()
void awaitTerminalEvent(long timeout, java.util.concurrent.TimeUnitunit)
void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, java.util.concurrent.TimeUnitunit)
指定時間可能會導致超時的異常(沒有在規定的時間內完成)。