Reactor實例解析
要點
- Reactor是一個運行在Java8之上的響應式流框架,它提供了一組響應式風格的API
- 除了個別API上的區別,它的原理跟RxJava很相似
- 它是第四代響應式框架,支持操作融合,類似RxJava 2
- Spring 5的響應式編程模型主要依賴Reactor
RxJava回顧
Reactor是 第四代 響應式框架,跟RxJava 2有些相似。Reactor項目由Pivotal啟動,以響應式流規范、Java8和ReactiveX術語表為基礎。它的設計是Reactor 2(上一個主要版本)和RxJava核心貢獻者共同努力的結果。
在之前的同系列文章“RxJava實例解析”和“測試RxJava”里,我們已經了解了響應式編程的基礎:數據流的概念、Observable類和它的各種操作以及通過工廠方法創建靜態和動態的Observable對象。
Observable是事件的源頭,Observer提供了一組簡單的接口,并通過訂閱事件源來消費Observable的事件。Observable通過onNext向Observer通知事件的到達,后面可能會跟上onError或onComplete來表示事件的結束。
RxJava提供了TestSubscriber來測試Observable,TestSubscriber是一個特別的Observer,可以用它斷言流事件。
在這篇文章里,我們將會對Reactor和RxJava進行比較,包括它們的相同點和不同點。
Reactor的類型
Reactor有兩種類型, Flux<T> 和 Mono<T> 。Flux類似RaxJava的Observable,它可以觸發零到多個事件,并根據實際情況結束處理或觸發錯誤。
Mono最多只觸發一個事件,它跟RxJava的 Single 和 Maybe 類似,所以可以把Mono<Void>用于在異步任務完成時發出通知。
因為這兩種類型之間的簡單區別,我們可以很容易地區分響應式API的類型:從返回的類型我們就可以知道一個方法會“發射并忘記”或“請求并等待”(Mono),還是在處理一個包含多個數據項的流(Flux)。
Flux和Mono的一些操作利用了這個特點在這兩種類型間互相轉換。例如,調用Flux<T>的single()方法將返回一個Mono<T>,而使用concatWith()方法把兩個Mono串在一起就可以得到一個Flux。類似地,有些操作對Mono來說毫無意義(例如take(n)會得到n>1的結果),而有些操作只有作用在Mono上才有意義(例如or(otherMono))。
Reactor設計的原則之一是要保持API的精簡,而對這兩種響應式類型的分離,是表現力與API易用性之間的折中。
“使用響應式流,基于Rx構建”
正如“RxJava實例解析”里所說的,從設計概念方面來看,RxJava有點類似Java 8 Steams API。而Reactor看起來有點像RxJava,不過這決不只是個巧合。這樣的設計是為了能夠給復雜的異步邏輯提供一套原生的具有Rx操作風格的響應式流API。所以說Reactor扎根于響應式流,同時在API方面盡可能地與RxJava靠攏。
響應式類庫和響應式流的使用
Reactive Streams (以下簡稱為RS)是“一種規范,它為基于非阻塞回壓的異步流處理提供了標準”。它是一組包含了TCK工具套件和四個簡單接口(Publisher、Subscriber、Subscription和Processor)的規范,這些接口將被集成到Java 9.
RS主要跟響應式回壓(稍后會詳細介紹)以及多個響應式事件源之間的交互操作有關。它并不提供任何操作方法,它只關注流的生命周期。
Reactor不同于其它框架的最關鍵一點就是RS。Flux和Mono這兩者都是RS的Publisher實現,它們都具備了響應式回壓的特點。
在RxJava 1里,只有少部分操作支持回壓,RxJava 1的Observable并沒有實現RS里的任何類型,不過它有一些RS類型的適配器。可以說,RxJava 1實際上比RS規范出現得更早,而且在RS規范設計期間,RxJava 1充當了函數式工作者的角色。
所以,你在使用那些Publisher適配器時,它們并不會為你提供任何操作。為了能做一些有用的操作,你可能需要用回Observable,而這個時候你需要另一個適配器。這種視覺上的混亂會破壞代碼的可讀性,特別是像Spring 5這樣的框架,如果整個框架建立在這樣的Publisher之上,那么就更是雜亂不堪。
RS規范不支持null值,所以在從RxJava 1遷移到Reactor或RxJava 2時要注意這點。如果你在代碼里把null用作特殊用途,那么就更是要注意了。
RxJava 2是在RS規范之后出現的,所以它直接在Flowable類型里實現了Publisher。不過除了RS類型,RxJava 2還保留了RxJava 1的“遺留”類型(Observable、Completable和Single)并且引入了其它一些可選類型——Maybe。這些類型提供了不同的語義,不過它們并沒有實現RS接口,這是它們的不足之處。跟RxJava 1不一樣,RxJava 2的Observable不支持RxJava 2的回壓協議(只有Flowable具備這個特性)。之所以這樣設計是為了能夠為一些場景提供一組豐富且流暢的API,比如用戶界面發出的事件,在這樣的場景里是不需要用到回壓的,而且也不可能用到。Completable、Single和Maybe不需要支持回壓,不過它們也提供了一組豐富的API,而且在被訂閱之前不會做任何事情。
在響應式領域,Reactor變得愈加精益,它的Mono和Flux兩種類型都實現了Publisher,并且都支持回壓。雖然把Mono作為一個Publisher需要付出一些額外的開銷,不過Mono在其它方面的優勢彌補了它的缺點。在后續部分我們將看到對Mono來說回壓意味著什么。
相比RxJava,API相似但不相同
ReactiveX和RxJava的操作術語表有時候真的難以掌握,因為歷史原因,有些操作的名字讓人感到困惑。Reactor盡量把API設計得緊湊,在給API取名時盡量選擇好一點的名字,不過總的來說,這兩套API看起來還是很相像。在最新的RxJava 2迭代版本中,RxJava 2借鑒了Reactor的一些術語,這預示著這兩個項目之間可能會有越來越緊密的合作。一些操作和概念總是先出現在其中的一個項目里,然后互相借鑒,最后會同時滲透到兩個項目里。
例如,Flux也有常見的just工廠方法(雖然只有兩種變形:接受一個參數或變長參數)。不過from方法有很多個變種,最值得一提的是fromIterable。當然,Flux也包含了那些常規的操作:map、merge、concat、flatMap、take,等等。
Reactor把RxJava里令人困惑的amb操作改成了看起來更加中肯的firstEmitting。另外,為了保持API的一致,toList被重新命名為collectList。實際上,所有以collect開頭的操作都會把值聚合到一個特定類型的集合里,不過只會為每個集合生成一個Mono。而所有以to開頭的操作被保留用于類型轉換,轉換之后的類型可以用于非響應式編程,例如toFuture()。
在類初始化和資源使用方面,Reactor之所以也能表現得如此精益,要得益于它的融合特性:Reactor可以把多個串行的操作(例如調用concatWith兩次)合并成單個操作,這樣就可以只對這個操作的內部類做一次初始化(也就是macro-fusion)。這個特性包含了基于數據源的優化,抵消了Mono在實現Publisher時的一些額外開銷。它還能在多個相關的操作之間共享資源(也就是micro-fusion),比如內部隊列。這些特性讓Reactor成為不折不扣的的第四代響應式框架,不過這個超出了這篇文章的討論范圍。
下面讓我們來看看幾個Reactor的操作。
一些操作示例
(這一小節包含了一些代碼片段,我們建議你動手去運行它們,深入體驗一下Reactor。所以你需要打開IDE,并創建一個測試項目,把Reactor加入到依賴項里。)
對于Maven,可以把下面的依賴加到pom.xml里:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.3.RELEASE</version>
</dependency>
對于Gradle,要把Reactor作為依賴項,類似這樣:
dependencies {
compile "io.projectreactor:reactor-core:3.0.3.RELEASE"
}
我們來重寫前面幾篇同系列文章里的例子!
Observable的創建跟在RxJava里有點類似,在Reactor里可以使用just(T...)和fromIterator(Iterable<T>)工廠方法來創建。just方法會把List作為一個整體觸發,而fromIterable會逐個觸發List里的每個元素:
public class ReactorSnippets {
private static List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
@Test
public void simpleCreation() {
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);
fewWords.subscribe(System.out::println);
System.out.println();
manyWords.subscribe(System.out::println);
}
}</code></pre>
跟在RxJava里一樣,上面的代碼會打印出:
Hello
World
the
quick
brown
fox
jumped
over
the
lazy
dog
為了打印句子里的每一個字母,我們還需要flatMap方法(跟在RxJava里一樣),不過在Reactor里我們使用fromArray來代替from。然后我們會用distinct過濾掉重復的字母,并用sort對它們進行排序。最后,我們使用zipWith和range輸出每個字母的次序:
@Test
public void findingMissingLetter() {
Flux<String> manyLetters = Flux
.fromIterable(words)
.flatMap(word -> Flux.fromArray(word.split("")))
.distinct()
.sort()
.zipWith(Flux.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string));
manyLetters.subscribe(System.out::println);
}</code></pre>
我們可以很容易地看到 s 被遺漏了:
1. a
- b
...
- r
- t
- u
...
z</code></pre>
我們可以通過糾正單詞數組來修復這個問題,不過也可以使用concat/concatWith和一個Mono來手動往字母Flux里添加“s”:
@Test
public void restoringMissingLetter() {
Mono<String> missing = Mono.just("s");
Flux<String> allLetters = Flux
.fromIterable(words)
.flatMap(word -> Flux.fromArray(word.split("")))
.concatWith(missing)
.distinct()
.sort()
.zipWith(Flux.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string));
allLetters.subscribe(System.out::println);
}</code></pre>
這樣,在去重和排序后,遺漏的s字母就被添加進來了:
1. a
- b
...
- r
- s
- t
...
z</code></pre>
上一篇文章提到了Rx和Streams API之間的相似之處,而實際上,在數據就緒的時候,Reactor也會像Java Steams那樣開始簡單地推送數據事件(可以參看下面關于回壓的內容)。只是在主線程里對事件源進行訂閱無法完成更加復雜的異步操作,主要是因為在訂閱完成之后,控制權會馬上返回到主線程,并退出整個程序。例如:
@Test
public void shortCircuit() {
Flux<String> helloPauseWorld =
Mono.just("Hello")
.concatWith(Mono.just("world")
.delaySubscriptionMillis(500));
helloPauseWorld.subscribe(System.out::println);
}</code></pre>
這個單元測試會打印出“Hello”,但無法打印出“world”,因為程序會過早地退出。在做簡單測試的時候,如果你只是像這樣寫一個簡單的主類,你通常會掉入陷阱。作為補救,你可以創建一個CountDownLatch對象,并在Subscriber(包括onError和onComplete)里調用countDown方法。不過這樣就變得不那么響應式了,不是嗎?(萬一你忘了調用countDown方法,而剛好發生錯誤了該怎么辦?)
解決這個問題的第二種方法是使用一些操作轉換到非響應式模式。toItetable和toStream會生成阻塞實例。我們在例子里使用toStream:
@Test
public void blocks() {
Flux<String> helloPauseWorld =
Mono.just("Hello")
.concatWith(Mono.just("world")
.delaySubscriptionMillis(500));
helloPauseWorld.toStream()
.forEach(System.out::println);
}</code></pre>
正如你所期待的那樣,在打印出“Hello”之后有一個短暫的停頓,然后打印出“world”并退出。我們之前也提過,RxJava的amb操作在Reactor里被重命名為firstEmitting(正如它的名字所表達的:選擇第一個Flux來觸發)。在下面的例子里,我們會創建一個Mono,這個Mono會有450毫秒的延遲,還會創建一個Flux,這個Flux以400毫秒的間隔觸發事件。在使用firstEmitting()對它們進行合并時,因為Flux的第一個值先于Mono的值出現,所以最后Flux會被采用:
@Test
public void firstEmitting() {
Mono<String> a = Mono.just("oops I'm late")
.delaySubscriptionMillis(450);
Flux<String> b = Flux.just("let's get", "the party", "started")
.delayMillis(400);
Flux.firstEmitting(a, b)
.toIterable()
.forEach(System.out::println);
}</code></pre>
這個單元測試會打印出句子的所有部分,它們之間有400毫秒的時間間隔。
這個時候你可能會想,如果我寫的測試使用的是4000毫秒的間隔而不是400毫秒,那會怎樣?你不會想在一個單元測試里等待4秒鐘的!在后面的部分,我們會看到Reactor提供了一些測試工具可以很好地解決這個問題。
我們已經通過例子比較了Reactor的一些常用操作,現在我們要回頭看看這個框架其它方面的不同點。
基于Java 8
Reactor選擇Java 8作為運行基礎而不是之前的任何版本,這再一次與它簡化API的目標不謀而合:RxJava選擇了Java 6,而Java 6里沒有java.util.function包,RxJava也就無法利用這個包下面的Functino類和Consumer類,所以它必須創建很多類似Func1、Func2、Action0、Action1這樣的類。RxJava 2使用類似Reactor 2的方式把這些類作為java.util.function的鏡像,因為它還得支持Java 7。
Reactor API還使用了Java 8里新引入的一些類型。因為大部分基于時間的操作都跟時間段有關系(例如超時、時間間隔、延遲,等等),所以直接就使用了Java 8里的Duration類。
Java 8 Stream API和CompletableFuture跟Flux/Mono之間可以很容易地進行互相轉換。那么一般情況下我們是否要把Stream轉成Flux?不一定。雖然說Flux或Mono對IO和內存相關操作的封裝所產生的開銷微不足道,不過Stream本身也并不會帶來很大延遲,所以直接使用Stream API是沒有問題的。對于上述情況,在RxJava 2里需要使用Observable,因為Observable不支持回壓,所以一旦對其進行訂閱,它就成為事件推送的來源。Reactor是基于Java 8的,所以在大部分情況下,Stream API已經能夠滿足需求了。要注意的是,盡管Flux和Mono的工廠模式也支持簡單類型,但它們的主要用途還是在于把對象合并到更高層次的流里面。所以一般來說,在現有代碼上應用響應式模式時,你不會希望把“long getCount()”這樣的方法轉成“Mono<Long> getCount()”。
關于回壓
回壓是RS規范和Reactor主要關注點之一(如果還有其它關注點的話)。回壓的原理是說,在一個推送場景里,生產者的生產速度比消費者的消費速度快,消費者會向生產者發出信號說“嘿,慢一點,我處理不過來了。”生產者可以借機控制數據生成的速度,而不是拋棄數據或者冒著產生級聯錯誤的風險繼續生成數據。
你也許會想,在Mono里為什么也需要回壓:什么樣的消費者會被一個單獨的觸發事件壓垮?答案是“應該不會有這樣的消費者”。不過,在Mono和CompletableFuture工作原理之間仍然有一個關鍵的不同點。后者只有推送:如果你持有一個Future的引用,那么說明一個異步任務已經在執行了。另一方面,回壓的Flux或Mono會啟動延遲的拉取-推送迭代:
- 延遲是因為在調用subscribe()方法之前不會發生任何事情
- 拉取是因為在訂閱和發出請求時,Subscriber會向上游發出信號,準備拉取下一個數據塊
- 接下來生產者向消費者推送數據,這些數據在消費者的請求范圍之內
對Mono來說,subscribe()方法就相當于一個按鈕,按下它就等于說“我準備好接收數據了”。Flux也有一個類似的按鈕,不過它是request(n)方法,這個方法是subscribe()的一般化用法。
Mono作為一個Publisher,它往往代表著一個耗費資源的任務(在IO、延遲等方面),意識到這點是理解回壓的關鍵:如果不對其進行訂閱,你就不需要為之付出任何代價。因為Mono經常跟具有回壓的Flux一起被編排到一個響應式鏈上,來自多個異步數據源的結果有可能被組合到一起,這種按需觸發的能力是避免阻塞的關鍵。
我們可以使用回壓來區分Mono的不同使用場景,相比上述的例子,Mono有另外一個常見的使用場景:把Flux的數據異步地聚合到Mono里。reduce和hasElement可以消費Flux里的每一個元素,再把這些數據以某種形式聚合起來(分別是reduce函數的調用結果和一個boolean值),作為一個Mono對外暴露數據。在這種情況下,使用Long.MAX_VALUE向上游發出回壓信號,上游會以完全推送的方式工作。
關于回壓另一個有意思的話題是它如何對存儲在內存里的流的對象數量進行限制。作為一個Publisher,數據源很有可能出現生成數據緩慢的問題,而來自下游的請求超出了可用數據項。在這種情況下,整個流很自然地進入到推送模式,消費者會在有新數據到達時收到通知。當生產高峰來臨,或者在生產速度加快的情況下,整個流又回到了拉取模式。在以上兩種情況下,最多有N項數據(request()請求的數據量)會被保留在內存里。
你可以對內存的使用情況進行更精確的推算,把N項數據跟每項數據需要消耗的內存W結合起來:這樣你就可以推算出最多需要消耗W*N的內存。實際上,Reactor在大多數情況下會根據N來做出優化:根據情況創建內部隊列,并應用預取策略,每次自動請求75%的數據量。
Reactor的操作有時候會根據它們所代表的語義和調用者的期望來改變回壓信號。例如對于操作buffer(10):下游請求N項數據,而這個操作會向上游請求10N的數據量,這樣就可以填滿緩沖區,為訂閱者提供足夠的數據。這通常被稱為“主動式回壓”,開發人員可以充分利用這種特性,例如在微批次場景里,可以顯式地告訴Reactor該如何從一個輸入源切換到一個輸出地。
跟Spring的關系
Reactor是Spring整個生態系統的基礎,特別是Spring 5(通過Spring Web Reactive)和Spring Data “kay”(跟spring-data-commons 2.0相對應的)。
這兩個項目的響應式版本是非常有用的,我們因此可以開發出完全響應式的Web應用:異步地處理請求,一直到數據庫,最后異步地返回結果。Spring應用因此可以更有效地利用資源,避免為每個請求單獨分配一個線程,還要等待I/O阻塞。
Reactor將被用于未來Spring應用的內部響應式核心組件,以及這些Spring組件暴露出來的API。一般情況下,它們可以處理RS Publisher,不過大多數時候它們要面對的是Flux/Mono,需要用到Reactor的豐富特性。當然,你也可以自行選擇其它響應式框架,Reactor提供了可以用來適配其它Reactor類型和RxJava類型甚至簡單的RS類型的鉤子接口。
目前,你可以通過Spring Boot 2.0.0.BUILD-SNAPSHOT和spring-boot-starter-web-reactive依賴項(可以在 start.spring.io 上生成一個這樣的項目)來體驗Spring Web Reactive:
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>
你可以像往常那樣寫你的@Controller,只不過把Spring MVC的底層變成響應式的,把大部分Spring MVC的契約換成了響應式非阻塞的契約。響應式層默認運行在Tomcat 8.5上,你也可以選擇使用Undertow或Netty。

另外,雖然Spring API是以Reactor類型為基礎的,不過在Spring Web Reactive模塊里可以為請求和響應使用各種各樣的響應式類型:
- Mono<T>:作為@RequestBody,請求實體T會被異步反序列化,之后的處理可以跟Mono關聯起來。作為返回類型,每次Mono發出了一個值,T就會被異步序列化并發回客戶端。你可以把請求Mono作為參數,并把參數化了的關聯處理作為結果Mono返回。
- Flux<T>:在流場景里使用(作為@RequestBody使用的輸入流以及包含了Flux返回類型的Server Sent Events)。
- Single/Observable:分別對應Mono和Flux,不過會切換回RxJava。
- Mono作為返回類型:在Mono結束時請求的處理也跟著完成。
- 非響應式返回類型(void和T):這個時候你的@Controller方法是同步的,不過它應該是非阻塞的(短暫的處理)。請求處理在方法執行完畢時結束,返回的T被異步地序列化并發回客戶端。
下面是使用Spring Web Reactive的例子:
@Controller
public class ExampleController {
private final MyReactiveLibrary reactiveLibrary;
public ExampleController(@Autowired MyReactiveLibrary reactiveLibrary) {
this.reactiveLibrary = reactiveLibrary;
}
@RequestMapping("hello/{who}")
@ResponseBody
public Mono<String> hello(@PathVariable String who) {
return Mono.just(who)
.map(w -> "Hello " + w + "!");
}
@RequestMapping(value = "heyMister", method = RequestMethod.POST)
@ResponseBody
public Flux<String> hey(@RequestBody Mono<Sir> body) {
return Mono.just("Hey mister ")
.concatWith(body
.flatMap(sir -> Flux.fromArray(sir.getLastName().split("")))
.map(String::toUpperCase)
.take(1)
).concatWith(Mono.just(". how are you?"));
}
}</code></pre>
第一個端點含有一個路徑變量,它被轉成Mono,并被映射到一個問候語里返回給客戶端。
一個發到/hello/SImon的GET請求會得到“Hello Simon!”的文本響應。
第二個端點相對復雜一些:它異步地接收序列化Sir對象(一個包含了firstName和lastName屬性的類)并使用flatMap方法把它映射到一個字母流里,這個字母流包含了lastName的所有字母。然后它選取流里的第一個字母,把它轉成大寫,并跟問候語串在一起。
所以向/heyMister POST一個JSON對象
{
"firstName": "Paul",
"lastName": "tEsT"
}
會返回字符串“Hello mister T. How are you?”。
響應式Spring Data目前也在開發當中,它被作為Kay發布的一部分,代碼在spring-data-commons 2.0.x分支上。現在已經有一個 里程碑 版本可以使用:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-releasetrain</artifactId>
<version>Kay-M1</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement></code></pre>
然后簡單地添加Spring Data Commons的依賴(它會自動從上面的BOM里獲取版本號):
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
Spring Data對響應式的支持主要表現在新的ReactiveCrudRepository接口,它擴展了Repository。這個接口暴露了CRUD方法,使用的是Reactor類型的輸入和返回值。還有一個RxJava 1的版本,叫作RxJava1CrudRepository。要在CrudRepository里通過id獲取一個實體,可以調用“T findOne(ID id)”方法,而在ReactiveCrudRepository和RxJava1CrudRepository里要分別調用“Mono<T> findOne(ID id)”和“Observable<T> findOne(ID id)”。還有其它的變種,它們接收Mono/Single作為參數,異步地提供key,并在此基礎上組合返回結果。
假設有一個響應式的后端存儲(或者mock的ReactiveCrudRepository bean),下面的controller將從前到后都是響應式的:
@Controller
public class DataExampleController {
private final ReactiveCrudRepository<Sir, String> reactiveRepository;
public DataExampleController(
@Autowired ReactiveCrudRepository<Sir, String> repo) {
this.reactiveRepository = repo;
}
@RequestMapping("data/{who}")
@ResponseBody
public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) {
return reactiveRepository.findOne(who)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.status(404)
.body(null));
}
}</code></pre>
請注意整個流程:我們異步地獲取實體并用map把它包裝成ResponseEntity,取得一個可以馬上返回的Mono。如果Spring Data repository找不到這個key的數據,會返回一個空的Mono。我們使用defaultIfEmpty顯式地返回404。
測試Reactor
“測試RxJava”這篇文章里提到了如何測試Observable。正如我們所看到的,RxJava提供了TestScheduler,我們可以把它跟RxJava的操作一起使用,這些操作接受一個Scheduler參數,TestScheduler會為這些操作啟動虛擬的時鐘。RxJava還提供了一個TestSubscriber類,可以用它等待Observable執行完畢,也可以用它對每個事件進行斷言(onNext的值和它的數量、觸發的onError,等等)。在RxJava 2里,TestSubscriber就是RS Subscriber,你可以用它測試Reactor的Flux和Mono!
在Reactor里,上述兩個使用廣泛的特性被組合到了StepVerifier類里。從reactor-addons倉庫的reactor-test模塊里可以獲取到StepVerifier。在創建Publisher實例時,調用StepVerifier.create方法可以初始化一個StepVerifier。如果要使用虛擬時鐘,可以調用StepVerifier.withVirtualTime方法,這個方法接受一個Supplier作為參數。之所以這樣設計,是因為它會首先保證創建一個VirtualTimeScheduler對象,并把它作為默認的Scheduler傳給舊有的操作。StepVerifier會對在Supplier里創建的Flux/Mono進行配置,把基于時間的操作轉為“虛擬時間操作”。接下來你就可以編寫各種你所期望的用例:下一個元素應該是什么,是否應該出現錯誤,是否應該及時向前移動,等等。借助其它方法,比如事件與Predicate的匹配或者對onNext事件的消費,你可以與那些值之間做一些更高級的交互(猶如在使用斷言框架)。任何地方拋出的AssertionError都會在最終的結果里反應出來。最后,調用verify()對你的用例進行測試,這個方法會通過StepVerifier.create或StepVerifier.withVirtualTime方法對預定義的事件源進行訂閱。
讓我們來舉一些簡單的例子來說明StepVerifier時如何工作的。首先要添加依賴到POM里:
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-test</artifactId>
<version>3.0.3.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.5.2</version>
<scope>test</scope>
</dependency></code></pre>
假設你有一個叫作MyReactiveLibrary的類,你要對這個類生成的一些Flux進行測試:
@Component
public class MyReactiveLibrary {
public Flux<String> alphabet5(char from) {
return Flux.range((int) from, 5)
.map(i -> "" + (char) i.intValue());
}
public Mono<String> withDelay(String value, int delaySeconds) {
return Mono.just(value)
.delaySubscription(Duration.ofSeconds(delaySeconds));
}
}</code></pre>
第一個方法將返回給定字母之后的5個字母。第二個方法返回一個Flux,它會以給定的時間間隔觸發給定值,其中的時間間隔以秒為單位。第一個測試是要保證使用x調用alphabet5的輸出被限定在x、y、z。使用StepVerifier看起來是這樣的:
@Test
public void testAlphabet5LimitsToZ() {
MyReactiveLibrary library = new MyReactiveLibrary();
StepVerifier.create(library.alphabet5('x'))
.expectNext("x", "y", "z")
.expectComplete()
.verify();
}
第二個測試要保證alphabet5返回的每個值都是字母。在這里我們使用斷言框架 AssertJ :
@Test
public void testAlphabet5LastItemIsAlphabeticalChar() {
MyReactiveLibrary library = new MyReactiveLibrary();
StepVerifier.create(library.alphabet5('x'))
.consumeNextWith(c -> assertThat(c)
.as("first is alphabetic").matches("[a-z]"))
.consumeNextWith(c -> assertThat(c)
.as("second is alphabetic").matches("[a-z]"))
.consumeNextWith(c -> assertThat(c)
.as("third is alphabetic").matches("[a-z]"))
.consumeNextWith(c -> assertThat(c)
.as("fourth is alphabetic").matches("[a-z]"))
.expectComplete()
.verify();
}
結果這些測試都運行失敗。讓我們檢查一下StepVirifier的輸出,看看能不能找出bug:
java.lang.AssertionError: expected: onComplete(); actual: onNext({)
java.lang.AssertionError: [fourth is alphabetic]
Expecting:
"{"
to match pattern:
"[a-z]"</code></pre>
看起來我們的方法并沒有在z的時候停住,而是繼續發出ASCII字符。我們可以加入.take(Math.min(5,'z'-from+1))來修復這個bug,或者把Math.min作為range的第二個參數。
我們要做的最后一個測試需要用到虛擬時鐘:我們使用withVirtualTime構造器來測試方法的延遲,而不需要真的等待指定的時間:
@Test
public void testWithDelay() {
MyReactiveLibrary library = new MyReactiveLibrary();
Duration testDuration =
StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30))
.expectSubscription()
.thenAwait(Duration.ofSeconds(10))
.expectNoEvent(Duration.ofSeconds(10))
.thenAwait(Duration.ofSeconds(10))
.expectNext("foo")
.expectComplete()
.verify();
System.out.println(testDuration.toMillis() + "ms");
}
這個測試用例測試一個將被延遲30秒的Flux:在訂閱之后的30秒內不會發生任何事情,然后發生一個onNext("foo")事件后結束。
System.out會打印出驗證所需要的時間,在最近的一次測試中它用掉了8毫秒。
如果調用構造器的create方法,thenAwait和expectNoEvent方法仍然可以使用,不過它們會阻塞指定的時間。
StepVerifier還有其它很多方法可以用于對Publisher進行測試(如果你有其它的想法,歡迎加入或反饋到 github倉庫 )。
自定義動態源
在“RxJava實例解析”一文中提到的動態和靜態Observable對Reactor來說也是適用的。
如果你要創建一個自定義的Flux,需要使用Reactor的FluxSink。這個類將會為你考慮所有跟異步有關的情況,你只需要把注意力集中在觸發事件上。
使用Flux.create并從回調中獲得的FluxSink可以用于后續的觸發事件。這個自定義的Flux是靜態的,為了把它變成動態的,可以使用publish()和connect()方法。基于上一篇文章中的例子,我們幾乎可以把它逐字逐句地翻譯成Reactor的版本:
SomeFeed<PriceTick> feed = new SomeFeed<>();
Flux<PriceTick> flux =
Flux.create(emitter ->
{
SomeListener listener = new SomeListener() {
@Override
public void priceTick(PriceTick event) {
emitter.next(event);
if (event.isLast()) {
emitter.complete();
}
}
@Override
public void error(Throwable e) {
emitter.error(e);
}};
feed.register(listener);
}, FluxSink.OverflowStrategy.BUFFER);
ConnectableFlux<PriceTick> hot = flux.publish();</code></pre>
在連接到動態Flux之前,可以做兩次訂閱:一個訂閱將打印每個tick的細節,另一個訂閱會打印出instrument:
hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick
.getDate(), priceTick.getInstrument(), priceTick.getPrice()));
hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));</code></pre>
接下來我們連接到動態Flux,并在程序結束前讓它運行5秒鐘:
hot.connect();
Thread.sleep(5000);
(要注意,如果PriceTick的isLast()方法改變了,那么feed本身也會結束)。
FluxSink通過isCancelled()來檢查下游的訂閱是否已取消。你還可以通過requestedFromDownstream()來獲得請求數,這個在遵循回壓策略時很管用。最后,你可以通過setCancellation方法釋放所有使用過的資源。
要注意,FluxSink使用了回壓,所以你必須提供一個OverflowStrategy來顯式地處理回壓。這個等價于使用onBackpressureXXX操作(例如,FluxSink.OverflowStrategy.BUFFER等價于.onBackpressureBuffer()),它們會覆蓋來自下游的回壓信號。
結論
在這篇文章里,我們學習了Reactor,一個運行在Java 8之上并以Rx規范和Reactive Streams規范為基礎的第四代響應式框架。我們展示了RxJava中的設計理念是如何被應用在Reactor上的,盡管它們之間有一些API設計上的差別。我們還展示了Reactor如何成為Spring 5的基礎,還提供了一些跟測試Publisher、Flux和Mono有關的資源。
來自:http://www.infoq.com/cn/articles/handle-data-science