測試RxJava2
關注InfoQ資訊的讀者可能已經留意到,我們前面給出了一篇很好的RxJava文章“測試RxJava”。本文是上一篇文章的修訂,用相同的例子循序漸進地介紹了如何測試RxJava2。譯者將兩篇文章中的不同之處用粗體標識出來,并使用添加注釋的形式說明示例代碼中的差異之處,以供讀過前篇的讀者快速瀏覽本文。
關鍵要點:
- RxJava含有內建的、測試友好的解決方案。
- 使用TestSubscriber去驗證Observable。
- 使用TestScheduler可實現對時間的嚴格控制。
- Awaitility庫提供了對測試環境進一步的控制。
本文是“測試RxJava”一文的修訂,根據RxJava2規范做了全面更新。
你已經閱讀過RxJava的相關內容,也已經在互聯網上體驗過像“RxJava實例解析”中的那些示例,現在打算在自己的代碼中探索一下響應式編程了。但是,現在卻一直困擾著如何測試那些可能會在代碼庫中發現的新功能呢?
使用響應式編程,就必須轉變對給定問題的推理方式,因為我們要聚焦于作為事件流的流動數據,而非個別數據項。事件通常是被不同的線程所產生和消費,因此在編寫測試時必須要對并發問題有著清晰的認識。幸運的是,RxJava提供了測試Observable和Subscription的內建支持,并且是直接構建于RxJava的核心依賴中。
第一步
讓我們回顧一下在“RxJava by Example”一文中所給出的那個詞匯的例子,看一下如何對該例子作測試。讓我們從基礎測試工具的設置開始。在我們的測試架構中,使用了JUnit作為測試工具。
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
//RxJava2中,包名由rx.xxx改為io.reactivex.xxx。
import org.junit.Test;
import java.util.*;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertThat;
import static org.hamcrest.Matchers.*;
public class RxJavaTest {
private static final List<String> WORDS = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
}
事實上在沒有給定調度器(Scheduler)的情況下,Subscription將默認運行于調用線程上。因此我們將在首個測試中使用原生的方法。這意味著我們可實現一個Subscription接口的對象,在Subscription發生后就立刻對其狀態做斷言(assert)。
@Test
public void testInSameThread() {
// given:
List<String> results = new ArrayList<>();
//Observable的from方法改為fromIterable。
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribe(results::add);
// then:
assertThat(results, notNullValue());
assertThat(results, hasSize(9));
assertThat(results, hasItem(" 4. fox"));
}
注意這里使用了顯式的List<String>容器,與實際訂閱者一起累計結果。由于給定的測試很簡單,所以可能會使你認為這種顯式累加器的方法已經足夠好了。但是切記產品級的Observable中可能封裝了錯誤或可能產生意外的事件。例子中的Subscriber與累加器的簡單組合并不足以覆蓋這種情況。但不用為此煩惱,RxJava提供的TestSubscriber類型就是用于處理這種情況的。下面我們使用TestSubscriber類型重構上面的測試。
@Test
//例子中所有的Subscriber改為Observer。
public void testUsingTestObserver() {
// given:
TestObserver<String> observer = new TestObserver<>();
//Observable的from方法改為fromIterable。
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribe(observer);
// then:
observer.assertComplete();
observer.assertNoErrors();
observer.assertValueCount(9);
//getOnNextEvents方法該為values方法。
assertThat(observer.values(), hasItem(" 4. fox"));
}
TestObserver不僅可替代用戶累加器,還另給出了一些行為。例如它能夠給出接收到的消息和每個事件相關數據的規模,它也可對Subscription被完成且在Observable消費期間沒有錯誤出現的狀態做斷言。雖然當前測試中的Observable并未生成任何的錯誤,但是回到“RxJava by Example”一文,我們從中得知了Observable將例外與數據事件等同對待。我們可通過如下的方式通過連接例外事件而模擬錯誤:
@Test
//例子中所有的Subscriber改為Observer。
public void testFailure() {
// given:
TestObserver<String> observer = new TestObserver<>();
Exception exception = new RuntimeException("boom!");
//Observable的from方法改為fromIterable。
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string))
.concatWith(Observable.error(exception));
// when:
observable.subscribe(observer);
// then:
observer.assertError(exception);
observer.assertNotComplete();
}
在我們所給出的有限用例中,所有的機制運行良好。但是實際的產品代碼可能會完全不同于例子。因此在下文中,我們將考慮一些更加復雜的產品實例。
定制調度器(Scheduler)
在產品代碼中,很多用例中的Observable都是在特定的線程上執行,這種線程在響應式編程環境中被稱為“調度器(Scheduler)”。很多Observable操作將可選的調度器參數作為附加參數使用。RxJava定義了一系列任何時候都可用的命名調度器,包括IO調度器(io)、計算調度器(computation,為共享線程)和新線程調度器(newThread)。開發人員也可去實現個人定制的調度器。讓我們通過指定計算調度器來修改Observable的代碼吧。
@Test
//例子中所有的Subscriber改為Observer。
public void testUsingComputationScheduler() {
// given:
TestObserver<String> observer = new TestObserver<>();
//Observable的from方法改為fromIterable。
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(observer);
//修訂版中新添加語句。
await().timeout(2, SECONDS)
.until(observer::valueCount, equalTo(9));
// then:
observer.assertComplete();
observer.assertNoErrors();
assertThat(observer.values(), hasItem(" 4. fox"));
}
當運行時就會立刻發現該代碼是存在問題的。Subscriber在測試線程上執行其斷言,但是Observable在后臺線程(計算線程)上生成值。這意味著執行Subscriber斷言可能先于Observable生成所有相關事件,因而導致測試的失敗。
為使測試順利執行,有如下的一些策略可選:
- 將Observable轉化為阻塞式的。
- 強制測試等待,直至給定的條件被滿足。
- 將計算調度器轉換為即刻(Schedulers.immediate())調度器。
我們將對每個策略做展開介紹,但將從“將Observable轉化為阻塞式”開始,因為實現該策略所需做的技術工作最少,這些工作與所使用的調度器無關。我們假設數據在后臺線程中生成,這將導致Subscriber從同一后臺線程得到通知。
我們要做的是強制生成所有的事件,并在下一個聲明被執行前就在測試中完成Observable。這是通過在Observable自身上調用 blockingIterable() 方法實現的。
@Test
public void testUsingBlockingCall() {
// given:
//Observable的from方法改為fromIterable。
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
//RxJava2中,toBlocking()和toIterable()方法改為blockingIterable()
Iterable<String> results = observable
.subscribeOn(Schedulers.computation())
.blockingIterable();
// then:
assertThat(results, notNullValue());
assertThat(results, iterableWithSize(9));
assertThat(results, hasItem(" 4. fox"));
}
該方法雖然適用于我們所給出的簡單代碼,但可能并不適用于實際的產品代碼。如果生產者生成所有的數據需要很長的時間,那將會產生什么后果?這將使測試變得非常慢,并增加了編譯時間,還可能會有其它的性能問題。 幸運的是,TestObserver提供了一系列方法,強制測試等待事件的結束。下面給出了一種實現方法:
//修訂版中新添加的代碼段。
@Test
public void testUsingComputationScheduler() {
// given:
TestObserver<String> observer = new TestObserver<>();
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(observer);
observer.awaitTerminalEvent(2, SECONDS);
// then:
observer.assertComplete();
observer.assertNoErrors();
assertThat(observer.values(), hasItem(" 4. fox"));
}
如果這些方法還不足以滿足需求,這里我推薦一個便利的程序庫,就是 Awaitility 。簡單地說,Awaitility是一個以精確、簡單易讀的方式對異步系統相關期望進行表述的DSL。在項目中可以用Maven添加Awaitility的依賴關系。
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>
或是使用Gradle:
testCompile 'org.awaitility:awaitility:2.0.0'
Awaitility DSL的接入點是org.awaitility.Awaitility.await()方法(參見下面例子中的第13和14行代碼)。可以使用Awaitility定義使測試繼續所必須達成的條件,也可在條件中加入超時或其它的時序約束,例如最小、最大或持續范圍。對于上面的例子,下面的代碼給出了如何在結果中使用Awaitility:
@Test
//例子中所有的Subscriber改為Observer。
public void testUsingComputationScheduler_awaitility() {
// given:
TestObserver<String> observer = new TestObserver<>();
//Observable的from方法改為fromIterable。
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(observer);
await().timeout(2, SECONDS)
.until(observer::valueCount, equalTo(9));
// then:
observer.assertComplete();
observer.assertNoErrors();
//getOnNextEvents()方法改為values()方法。
assertThat(observer.values(), hasItem(" 4. fox"));
}
此版本測試并未以任何方式改變Observable的本質,這使得你做測試時不必對產品代碼做任何改動。該版本測試使用最多2秒的等待時間通過檢查Subscriber狀態使Observable執行其作業。如果一切進行順利,在2秒內就可將Subscriber的狀態釋放給所有的9個事件。
Awaitility具有和Hamcrest的匹配符、Java 8的lambda表達式和方法引用等的良好協作,從而給出精確的和可讀的測試條件。Awaitility還提供了預制擴展,用于那些被廣泛使用的JVM語言,其中包括Groovy和Scala。
我們要給出最后一個策略中使用了RxJava的擴展機制,該擴展是以RxJava API的組成部分發布的。RxJava中定義了一系列的擴展點,允許對幾乎任何默認的RxJava行為進行微調。這種擴展機制使我們可以針對特定的RxJava特性提供修改過的值。利用該機制,在無需關心生成代碼中所指定的調度器的情況下,我們可在測試中注入選定的調度器。這正是我們所尋找的方法,該方法被封裝在RxJavaHooks類中。假設產品代碼依賴于計算調度器,我們將覆蓋它的默認值,返回一個調度器,它作為被調用的代碼使事件處理發生,這是 入隊調度器(Schedulers.trampoline()) 。下面給出測試的代碼:
@Test
//名稱中的Hook改為Plugins,代碼中所有subscriber改為observer。
public void testUsingRxJavaPluginsWithImmediateScheduler() {
// given:
//調度器由immediate改為trampoline。
RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
Schedulers.trampoline());
TestObserver<String> observer = new TestObserver<>();
//Observable的from方法改為fromIterable。
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
try {
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(observer);
// then:
observer.assertComplete();
observer.assertNoErrors();
observer.assertValueCount(9);
assertThat(observer.values(), hasItem(" 4. fox"));
} finally {
RxJavaPlugins.reset();
}
}
在測試中,產品代碼察覺不到計算調度器是即刻的。請注意鉤子函數必須被重置,否則即刻調度器的設置可能會發生泄漏,導致在各處的測試被破壞。使用try/finall代碼塊會在一定程度上模糊了測試的目的,但是幸運的是我們可以使用JUnit規則重構該行為,使測試更加精煉,結果更可讀。下面給出使用上述規則的一種可能的實現代碼:
//由Public改為private static
private static class ImmediateSchedulersRule implements TestRule {
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
//所有的調度器由immediate改為trampline。
public void evaluate() throws Throwable {
RxJavaPlugins.setIoSchedulerHandler(scheduler ->
Schedulers.trampoline());
RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
Schedulers.trampoline());
RxJavaPlugins.setNewThreadSchedulerHandler(scheduler ->
Schedulers.trampoline());
try {
base.evaluate();
} finally {
RxJavaPlugins.reset();
}
}
};
}
}
此外,我們還對另外兩個調度器的生成方法做了重寫。該規則對此后其它的測試目標更為通用。在新的測試用例類中,該規則的使用方法很直接,只需簡單地定義一個域,并將其中新類型標注為@Rule即可。示例代碼如下:
@Rule
public final ImmediateSchedulersRule schedulers =
new ImmediateSchedulersRule();
@Test
//例子中所有的Subscriber改為Observer。
public void testUsingImmediateSchedulersRule() {
// given:
TestObserver<String> observer = new TestObserver<>();
//Observable的from方法改為fromIterable。
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(observer);
// then:
observer.assertComplete();
observer.assertNoErrors();
observer.assertValueCount(9);
assertThat(observer.values(), hasItem(" 4. fox"));
}
最終我們可得到與前面測試一樣的行為,卻沒有像前面測試那樣的雜亂。下面用一些篇幅來回顧一下我們目前已經做到的事情:
- Subscribers將在同一線程中處理數據,只要沒有使用特定的調度器。這意味著在Subscriber向Observable做訂閱后,我們就可在該Subscriber上做斷言。
- TestObserver 可累計事件,并給出自身狀態的追加斷言。
- 任何對象都可轉換為阻塞式的,這使得無論對象使用何種調度器,我們都可以同步等待事件的生成。
- RxJava提供了擴展機制,允許開發人員重寫其默認方法,并以適當的方式注入到產品代碼中。
- 并發代碼可使用Awaitility DSL測試。
上述的每個技術都作用于不同的場景中,但是所有技術都是通過“共同的線程”(譯者注:作者在原文中指出common thread是作為雙關語使用的,其另一個意思是“類似的思路”)相關聯:在對Subscriber狀態做斷言之前,測試代碼需等待Observable完成。考慮到Observable的行為會生成數據,是否有方法對該行為進行檢查呢?換句話說,是否可以用編程的方式做Observable的現場調試?我們將在后文中給出這樣的技術。
操控時間
到目前為止我們已用黑箱方式測試了Observable和Subscription。下面我們將考慮另外一種操控時間的技術,該技術使我們可以在Observable依然處于活動狀態時,打開引擎蓋去查看Subscriber狀態。換句話說,我們將使用采用了RxJava的TestScheduler類白箱測試技術,這可以說是RxJava再一次來救場。這種特定的調度器可精確地設定時間的內部使用方式,例如可將時間提前半秒,或是使時間跳躍5秒。我們將首先給出這種新調度器實例的創建方法,然后再討論代碼的測試。
@Test
//例子中所有的Subscriber改為Observer。
public void testUsingTestScheduler() {
// given:
TestScheduler scheduler = new TestScheduler();
TestObserver<String> observer = new TestObserver<>();
Observable<Long> tick = Observable.interval(1, SECONDS, scheduler);
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(tick,
(string, index) -> String.format("%2d. %s", index, string));
observable.subscribeOn(scheduler)
.subscribe(observer);
// expect:
observer.assertNoValues();
observer.assertNotComplete();
// when:
scheduler.advanceTimeBy(1, SECONDS);
// then:
observer.assertNoErrors();
observer.assertValueCount(1);
observer.assertValues(" 0. the");
// when:
scheduler.advanceTimeTo(9, SECONDS);
observer.assertComplete();
observer.assertNoErrors();
observer.assertValueCount(9);
}
該“產品”代碼有了略微的改變,這是由于我們使用了綁定到調度器時隙(interval())的方法生成計數(第6行),而非生成一個計數的范圍。但這樣做具有一個副作用,就是計數是從零開始生成的,而非從1開始。一旦配置了Observable和測試調度器,我們立刻做出這樣的斷言,即假定Subscriber不具有值(第15行)且沒有被完成或生成任何的錯誤(第16行)。這是一個完整性測試,因為此時調度器并沒有被移動,因而沒有任何值被Observable產生或是被Subscriber接收到。
下面將時間向前調1整秒( 第21行 ),該操作將會導致Observable生成第一個值,這正是隨后的斷言集所要檢查的( 第24到26行 )。
下面將時間從當前時間調到9秒。需要注意的是,這意味著將時間準確地調整為調度器啟動后的第9秒(并非是向前調1秒后再向前調9秒,即調度器檢查啟動后的第10秒)。換句話說,advanceTimeBy()方法將調度器的時間調整為相對于當前位置的時間,而advanceTimeTo()以絕對的方式調整時間。此后我們做出下一輪的斷言( 第30到32行 ),用于確保所有的數據由Observable生成且被Subscriber消費。另一件需要說明的事情就是使用TestScheduler時,真實的時間是立刻發生調整的,這著意味著測試并不用實際等待9秒才去完成。
正如你所看到的,該調度器的使用是非常便利的,僅需將該調度器提供給正在測試的Observable即可。但是對使用了指定類型調度器的Observable,該調度器并不能很好地適用。但是稍等一下,之前我們看到的是如何使用 RxJavaPlugins 切換一個不影響生產代碼的調度器,而這一次是提供一個代替即刻調度器的TestScheduler。我們甚至可以apply定制JUnit規則同樣的技術,使之前的代碼可以用更重用的方式予以重寫。首先該新規則為:
//由public改為private static,所有Hooks改為Plugins。
private static class TestSchedulerRule implements TestRule {
private final TestScheduler testScheduler = new TestScheduler();
public TestScheduler getTestScheduler() {
return testScheduler;
}
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaPlugins.setIoSchedulerHandler(scheduler ->
testScheduler);
RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
testScheduler);
RxJavaPlugins.setNewThreadSchedulerHandler(scheduler ->
testScheduler);
try {
base.evaluate();
} finally {
RxJavaPlugins.reset();
}
}
};
}
}
緊接著是實際的測試代碼(在一個新的測試用例類中),去使用我們的測試規則:
@Rule
public final TestSchedulerRule testSchedulerRule = new TestSchedulerRule();
@Test
//例子中所有的Subscriber改為Observer。
public void testUsingTestSchedulersRule() {
// given:
TestObserver<String> observer = new TestObserver<>();
Observable<String> observable = Observable.fromIterable(WORDS)
.zipWith(Observable.interval(1, SECONDS),
(string, index) -> String.format("%2d. %s", index, string));
observable.subscribeOn(Schedulers.computation())
.subscribe(observer);
// expect
observer.assertNoValues();
observer.assertNotComplete();
// when:
testSchedulerRule.getTestScheduler().advanceTimeBy(1, SECONDS);
// then:
observer.assertNoErrors();
observer.assertValueCount(1);
observer.assertValues(" 0. the");
// when:
testSchedulerRule.getTestScheduler().advanceTimeTo(9, SECONDS);
observer.assertComplete();
observer.assertNoErrors();
observer.assertValueCount(9);
}
這樣你就成功地實現了它。使用經由RxJavaHooks注入TestScheduler的方法,可在無需更改原始Observable組合的情況下編寫測試代碼,此外它給出了一種在observable自身執行期間改變時間、并在特定點上做斷言的方法。在本文中給出的所有這些技術,應該足夠你選擇用來測試RxJava的代碼了。
未來
RxJava是最先為Java提供響應式編程能力的程序庫之一。為了使RxJava API更好地符合 Reactive Streams 規范,即將推出的2.0版將會是重新設計的。Reactive Streams規范以Java和JavaScript運行時為目標,提供了使用非阻塞背壓機制(back pressure)的異步流處理標準。這意味著下一版的RxJava中將會出現一些API改進。對這些改進的詳細描述參見 RxJava wiki 。
對于測試而言,這些核心類型(Observable、Maybe和Single)現在都給出了便利易用的test()方法,實現現場創建TestSubscriber實例。也可在TestSubscriber上鏈接方法調用,對這類用法也有一些新的斷言方法。
本文是“測試RxJava”一文的修訂。
來自:http://www.infoq.com/cn/articles/Testing-RxJava2