Airbnb:我們的安卓客戶端是如何使用 RxJava 的

ChesterTeg 8年前發布 | 36K 次閱讀 Android RxJava Android開發 移動開發

介紹 (0:00)

在這篇文章里,我會討論 RxJava 和 Airbnb 的客戶端里運用它的方法,我在 Airbnb 工作了一年多一點的時間。給你一個關于規模的感覺,我們一共有15個人。公司現在在快速擴張中,而且對于一個小組來說,集成新的技術都是非常具有挑戰性的。

為什么用 RxJava? (0:47)

我們都知道 移動開發是困難的。移動用戶期望即時響應,而且還有在不同的線程間來回切換的需求。除了主線程,你還要做網絡連接,同時你還需要在后臺處理其他的各種不同的事情。最重要的是,你不能阻塞 UI 線程。

RxJava 是解決這類問題的好方法,因為他能夠使得線程間的切換比較容易。這已經集成在框架里面了。異步操作非常笨重而且容易出錯,RxJava 使得你不用再這樣做了,這也是你能把不同的線程組合在一起的原因。

我們需要 RxJava 的真正原因其實是 我們的軟件很爛。為什么我們有如此多的 bug?為什么我們需要 crash 報告工具來跟蹤我們成千上萬的 crash,或者多少用戶已經對我們生氣了?這里可能有些什么事情不對勁。

我們需要改變;我覺得 imperative 編程是我們不應該采用的方法。當然,面向對象編程已經流行很多年了。它已經深入到了現代程序員的骨髓里了。每個人都盲目的使用它,但是它不是我們開發軟件的必需品。

Functional 編程是 RxJava 里面的概念,而且我覺得用這種方法,代碼更加健壯,而且永遠不需要維護狀態了。代碼更加可靠而且你知道它一定工作。

底線:我們的問題是我們寫了很多糟糕的代碼,移動開發也是很困難的,而 RxJava 是解決這個問題的一個方案。

流 (3:48)

RxJava 是 ReactiveX 的一部分,一組開源庫。它們有許多不同的庫,包括 JavaScript, Groovy, Ruby, Java, C#,以及其他。然而,它們都有著同樣的概念,這就是 functional 編程。

流是這個概念的核心部分。你的代碼里面的所有東西都是“流”,而且你需要重新構建對它的認知。今天,我們認為代碼是順序執行的,因為它也是這樣被編寫的。你寫一個指令,然后另外一個,然后你有一個循環,然后你調用一個方法,然后你返回。你把它們加入到不同的線程里面去,然后你有了并行處理。你總是需要考慮如果一個線程返回了怎么辦,而此時你已經在代碼的別的什么地方了。這非常難,特別是針對移動開發。

學習流的概念對我來說確實意味著些不同。我六個月前開始學習它,那時我剛開始在 Airbnb 里面使用 RxJava,它實在是太復雜了。我第一次看到它的時候,我剛看了一頁,就迷失在不同的概念里面了。Observables, observers,太多了。然后,慢慢地熟悉起來,你才開始理解它的一些概念了,然后它變得有意義了很多。核心思想是 一切皆是流。

太多的概念 (5:54)

我們都同意 reactive 編程是困難的,但是它卻越來越流行。我可以看到這樣的趨勢:React Native, React,還有其他的不同的庫都涌現出來。還有些新的工具,例如 Cycle, Elm,和專注 Reactive 的其他語言的工具。

然后,的確是有太多的概念需要理解了!就我看來,有兩個主要的概念:observable 和 observer,這也是你知道的兩個主要類。然后,還有 subscriber, subscription, producer, hot、cold observables, backpressure, scheduler, subject,和更多。這只是 10%。它是一個龐然大物!

如果你感覺有些混淆,沒關系,我很理解你。保持學習,保持實驗,然后所有的事情最終都會有意義的。(我不是嚇唬你,但這就是事實。)

擁抱挑戰 (7:15)

我想我可以介紹下在 Airbnb 我們是怎么做的:我們的過程,我們學到的東西,好的經驗和壞的教訓。

團隊規模 - 我們團隊有 15 個人,這是個很大的團隊了,也是我工作過的最大的團隊。每個人都在不斷的給同一個 repo(我們的安卓應用)提交代碼。 我們只有一個代碼庫,而且同事們之間互相審核代碼。每個人都了解發生了些什么而且每個人都知道你寫的代碼是什么意思,這點很重要。

我們使用 Phabricator 來審核代碼,這個和 GitHub 上的 pull request 很類似。你撰寫意見,提出建議,給出反饋,還有其他的流程。在你決定使用 RxJava 之前,讓每個人都能認同是很重要的。如果你開始使用它了,團隊里的其他人還不知道要發生些什么,而且不知道背后的原因,那么進程將是十分困難的。如果你只有兩個人,這沒關系,但是當你有更多的人加入團隊的時候,讓每個人都全速前進將會是非常有挑戰的事情。

學習曲線 - 你需要理解你常常會犯一些很低級的錯誤。你可能會寫出沒有任何意義的代碼,你可能會導致產品 crash,但是所有的事情最終都會好起來的。在我的經驗里,每個人大概花了兩個月的時間來理解 RxJava。我建議團隊一起討論它,然后盡量給團隊里面的其他成員解釋這些概念,如果你計劃采納這項技術的話。你自己盡量先能有個好的理解,然后當你感覺到成熟了,把所有人叫到屋里討論它。實戰:打開 Android Studio 然后演示一些代碼。

調試 - 這會是個大問題。每個社區里的人都知道這個問題,他們也知道這是需要改進的地方。我最近在我們的 bug 系統里面收到一個有著很多異常的堆棧日志。這是很復雜的事情,而且有很多干擾信息。我不知道是不是有人已經在積極地解決這個問題了,所以,如果有初創公司想找個可以練手的東西的話,這是個你可以開始的地方。

常見的陷阱 (11:34)

我想指出一些我們常見的攔路虎。在使用 RxJava 的過程中它們都是大麻煩。

observeOn() (11:49)

如果你想使用 RxJava,你需要知道以下的重要核心概念。

return observableFactory.<T>toObservable(this)
     .compose(this.<T>transform(observableRequest))
     .observeOn(Schedulers.io())
     .map(new ResponseMetadataOperator<>(this))
     .flatMap(this::mapResponse)
     .observeOn(AndroidSchedulers.mainThread())
     .<AirResponse<T>>compose(group.transform(tag))
     .doOnError(new ErrorLoggingAction(request))
     .doOnError(NetworkUtil::checkForExpiredToken)
     .subscribe(request.observer());

這是我們的應用中創建 RxJava observable 流的時候的一段代碼。我們調用 observeOn 兩次,看起來好像無意義。實際上,你每次調用 observeOn,后面的代碼都會運行在那個 scheduler 上,然后你之后又調用一遍,它就又切換一次。

當我們使用 RxJava 的時候,你創建了一個流。一個關于 RxJava 的誤解就是它是異步的,但是事實上每件事情都是默認同步的。當你創建一個流的時候,你僅僅是創建了一個點,這里我們會向它訂閱。當你訂閱的時候,你把所有的東西都才創建在一起了,然后才能執行它。在你調用 subscribe 之前,你僅僅是創建了一個流。這比較類似聲明的流程。當你說 observeOn,你切換到另外一個線程。如果你不調用 observeOn,每件事都還是在原來那個需要訂閱給 observable 的線程里。這里我們有 subscribe,所以如果這是從主線程中調用的話,主線程里的所有事情都會發生而不論你做了些什么。所以 observeOn 是一個有效的調用其他線程工作的方法,而且它會使過程異步化。

我們第一次調用 observeOn 的時候,我們傳入了一個 scheduler。RxJava 有一些內嵌的 scheduler,其中一個就是 I/O scheduler,這當然是和 I/O 線程工作在一起的,I/O 線程是一個和你的 I/O 綁定的線程池。map 和 flatMap 操作符在那個線程里面執行,然后當它結束的時候,我們把它發送回到主線程。所以,你正在主線程里面工作,假設這是從主線程里面調用的,然后加載到后臺線程, 最后把它移回到主線程。

如果你不使用 RxJava,這會是個非常復雜的事情。然而,現在我們有這么簡單的描述性的方法來實現你想做的事情。這也是為什么 RxJava 會很復雜的原因:這么少的代碼,但是卻要花很長的時間來真正理解里面發生了些什么。

subscribeOn() (16:14)

另外一個和 observeOn 聯系緊密的概念即使 subscribeOn。這會改變 observable 訂閱了的那個線程,如果你對這些概念不熟的話,聽起來會覺得很復雜。

return observableFactory.<T>toObservable(this)
     .compose(this.<T>transform(observableRequest))
     .observeOn(Schedulers.io())
     .map(new ResponseMetadataOperator<>(this))
     .flatMap(this::mapResponse)
     .observeOn(AndroidSchedulers.mainThread())
     .<AirResponse<T>>compose(group.transform(tag))
     .doOnError(new ErrorLoggingAction(request))
     .doOnError(NetworkUtil::checkForExpiredToken)
     .subscribeOn(Schedulers.io())
     .subscribe(request.observer());

第一個調用,observableFactory.<T>toObservable 是 observable 對象創建的地方,這也是直接受 subscribeOn 影響的代碼。還有一些運行 subscription 的代碼,當你需要向它訂閱的時候,你也有些代碼需要運行。然后你才會有那個流上的其他變化。當執行到 subscription 的代碼的時候,這些才會變化,而不是其他的地方。你什么時候調用它沒有關系,它只在 subscription 被執行的時候線程才會改變。

錯誤處理 (18:05)

return observableFactory.<T>toObservable(this)
     .compose(this.<T>transform(observableRequest))
     .observeOn(Schedulers.io())
     .map(new ResponseMetadataOperator<>(this))
     .flatMap(this::mapResponse)
     .observeOn(AndroidSchedulers.mainThread())
     .<AirResponse<T>>compose(group.transform(tag))
     .doOnError(new ErrorLoggingAction(request))
     .doOnError(NetworkUtil::checkForExpiredToken)
     .subscribeOn(Schedulers.io())
     .subscribe(request.observer());

我們使用 doOnError 作為錯誤日志的一個方法。你的網絡出現異常了,然后你想給你的分析服務注入日志,你想知道這種情況發生了多少次。doOnError 是個每次你在流上出現錯誤都會被執行的動作,然后你會有多次調用,所以你有對于一個流的多次錯誤處理。當它看見一個錯誤事件的時候,它就會調用它的方法,但是這是個副作用。

return observableRequest
     .rawRequest()
     .<Observable<Response<T>>>newCall()
     .observeOn(Schedulers.io())
     .unsubscribeOn(Schedulers.io())
     .flatMap(responseMapper(airRequest))
     .onErrorResumeNext(errorMapper(airRequest));

另一個可以被使用的結構是 onErrorResumeNext,這個工作起來像是個 catch 塊,這在 reactive 世界里就是個沒有意義的事情。這就好像你再說,“Hey,當我看到一個錯誤的時候,我想運行這個動作來撲捉這個錯誤,然后繼續執行,然后打包那個異常,寫個日志,然后返回個空的數據集合或者其他什么東西。”如果你還是 imperative 思維,這就像個 catch 塊。

單元測試 (20:04)

給同步的流數據做單元測試聽起來很復雜,所以 RxJava 提供了這個漂亮的類叫做TestSubscriber

@Test public void testErrorResponseNonJSON() {
 server.enqueue(new MockResponse()
     .setBody("something bad happened")
     .setResponseCode(500));
 TestRequest request = new TestRequest.Builder<String>().build();
 TestSubscriber<AirResponse<String>> subscriber = new TestSubscriber<>();
 observableFactory.<String>toObservable(request).subscribe(subscriber);
 subscriber.awaitTerminalEvent(3L, TimeUnit.SECONDS);
 NetworkException exception = (NetworkException)
     subscriber.getOnErrorEvents().get(0);
 assertThat(exception.errorResponse(), equalTo(null));
 assertThat(exception.bodyString(), equalTo("something bad happened"));
}

你可以使用 TestSubscriber 來訂閱你的流,然后你可以阻塞它,直到它獲得了一個事件。有一些簡便的方法,例如 .awaitTerminalEvent,這也會阻塞你的線程直到一個終端事件(例如:onCompleted 或者 onError)。對于你的流中的每個事件,你可以得到 0 次到 n 次的onNext 事件,然后當它結束的時候,你獲得 onCompleted 或者它失敗了,你獲得 onError,之后你再也收不到任何時間了,流也結束了。

@Test public void testUnicodeHeader() {
 server.enqueue(new MockResponse().setBody("\"Hello World\""));
 TestRequest request = new TestRequest.Builder<String>()
 .header("Bogus", "中華電信")
 .build();
 observableFactory.toObservable(request)
 .toBlocking()
 .first();
 RecordedRequest recordedRequest = server.takeRequest();
 assertThat(recordedRequest.getHeader("Bogus"), equalTo("????"));
}

另一件事是你可以使用 toBlocking。這會立即阻塞線程,這在單元測試里面十分有用。當然,作為產品代碼用處不大。如果你使用 RxJava,你不太可能會阻塞你自己線程,但是在測試的時候就非常方便了。這會比使用測試 subscriber 代碼量少點。如果你知道不會失敗,你可以直接阻塞然后獲得第一個事件。

在這個例子里面,我們使用了 OkHttp 來 mock 響應,然后發送假的響應回去。我們增加了一個前綴,然后測試一些前綴的特殊字符,如果這是個 OkHttp 的 bug 的話。然后我們正確清理前綴,就可以正常測試了。

內存泄漏 (22:41)

如果你做移動開發,你知道內存異常的所有情況。

當你向一個流訂閱的時候,你得到了一個 subscription。當你得到這個 subscription 之后,你可以注銷它,所以你需要顯示地釋放資源。你不在需要引用那個流了。我們都知道發起請求的重要性,例如,從安卓的 activity 或者 fragment 中發起請求。你不要忘記了,你想在 acitivity 銷毀的時候釋放這些資源,這是一個常見的模式。

private final CompositeSubscription pendingSubscriptions =
    new CompositeSubscription();

@Override public void onCreate() { pendingSubscriptions.add( observable.subscribe(observer)); }

@Override public void onDestroy() { pendingSubscriptions.clear(); }</code></pre>

你可以使用 CompositeSubscription,這是個能夠集合多個訂閱的類,而且你給它增加一個訂閱。然后,一旦你銷毀 activity,你就能清除它了。

附加資源 (23:55)

問&答 (23:35)

問:當有錯誤的時候你們重試的機制是怎樣的?例如向服務器發起的取數據的網絡請求。

Felipe: 在我們的實際中,我們沒有重試機制;我們就是失敗。但是如果你想使用它,一個方法就是使用 onErrorResumeNext。你可以繼續另一個請求,然后處理多次。如果你獲得一個錯誤,繼續兩個不同的請求然后你能實現兩次,這樣你可以在你需要的時候重試兩次。這是一個方法。我沒有真正地用 RxJava 實現它,所以我沒有一個特別的建議。

問:在 Java 8 中,lambdas 不需要擁有一個它被創建的上下文的引用,但是在安卓里面,我認為他們需要?安卓沒有真正實現 Java 8。我很好奇你們在處理交還引用中的關于內存泄漏的經驗。

Felipe: 據我所知,關于內存管理,我們沒有遇到過 lambdas 的問題。這很令人吃驚。我們使用Retrolambda 這是個 Java 8 的 lambdas 到 Java 7 字節流的移植,因為安卓不能支持 Java 8。把字節流改成和 Java 7 兼容是種 hack 的方法。我沒有遇見任何問題,我的 lambdas 代碼都運行正常。

問:我不知道你們有沒有跟蹤 RxJava repo,但是 Ben Christensen 十月份離開去為 非死book 工作了,自從那以后,項目就進入了無人問津的狀態。現在它被另外一個人維護著。你們有這方面的擔心嗎,把 Rxjava 框架作為開發平臺的長期可行性?

Felipe: 這是事實。據我所知,Netflix 承諾分配人來做這件事,但是看起來還沒有開始。David Karnok,獨自維護這個產品的人,真的非常非常聰明。事實是,RxJava 1.0 就很穩定了。我不認為我會擔心未來的 bug 修正或者安全性或者其他類似的問題。未來幾年都不會有問題。關于未來的開發工作,比如 2.0,這是未知的事情。我們不知道會發生什么。我對 1.0 很滿意,因為它非常穩定。

問:你能說明下拆分流的技術嗎?現在有許多合并流的資料,我能找到的唯一一個關于分拆的技術是 publish-connect 模型,而且我感覺這更適合增加多個訂閱而不是拆分 observable 為多個可觀察的流。

Felipe: 正確,所以以前我使用 share,這是使得一個 observable 被多人訂閱的方法,正如我說的那樣。我不記得 share 和 publish-connect 具體的差別了,它們本質上是同樣的事情。我們用它來允許多人訂閱同一個 observable,但是我真不知道我們能這樣做的其他的方法。

問:你提到一個難點是 hot 和 cold observables。你有相關的材料或者在你的代碼里面的命名來解釋下誰是誰嗎,就像,“哦,驚喜!你的訂閱有許多副作用!”

Felipe: 這是你需要理解的又一個概念。我們不是真有這樣的問題因為大部分我們使用 RxJava 的情況是圍繞 Retrofit 的,這也是非常直接的。你訂閱它,然后你發起一個網絡請求。我不是真的有太多的關于它的問題,因為我也沒有接觸那么多。但是當 RxJava repo 有一個好的 Wiki,他們有許多很棒的文檔,而且 ReactiveX 網站也有許多好東西。

問:當你決定轉向 RxJava 的時候,你決定把整個代碼庫里的代碼都遷移過去。你能談些 Airbnb 在這期間的挑戰嗎?這僅僅是人的問題,或者是別的什么?

Felipe: 我會說大部分是人的問題,包括我,因為我也是在學習。當你還在學習的時候,你還要教會別人這個工具,這是個問題。解釋有些概念實在太難了,因為它們是如此的復雜。在我看來最大的挑戰是讓每個人都能全速前進。我們沒有許多的產品問題。我們有一些 crash 而且是本地使用的原因,我記不太清了。我們有些繼承的網絡部分的建立是使用的 Volley,然后我們轉向了 Retrofit。我們有許多圍繞著那周邊的繼承的代碼,像請求類,而且它還不是真正地符合 Retrofit 工作的方式的,所以我們實際上是 fork 了 Retrofit 然后讓它在我們當前的設置里工作的。然而,大部分挑戰是讓每一個人都理解它。這需要時間,你不能一撮而就。

問:你提到 RxJava 是困難的。現在回過頭去看你們跨越的旅程,你認為,為了你提到的這些優點而做的妥協是值得的嗎?

Felipe: 我會認為這是值得投資的。這當然是個大投資。我們需要花費大量的時間學習,然后我認為這一定是值得的。一旦你使用的越來越多,你得到的好處也就越多。當你開始結合流而且重新賦予他們意義的時候,你會看著而且想,“嘿,我用一小段代碼實現了許多的事情。” 當你到達這個階段的時候,這是個特別神奇的事情。所以我覺得認為這是值得的。

問:你在所有的事情中都用了 RxJava 嗎,或者你們有一個分界線,只有在某種復雜的異步任務中你們才會采用 RxJavava?

Flipe: 我們呢,只在我們的網絡層使用它。我們沒有給 UI 暴露這些細節,因為那會是個更大的賭注。讓每一個人都理解,然后知識真正被運用,因為如果你給你的 activity 層或者你的 view 暴露它,這就意味著所有人都需要理解它,運用它。在網絡層,只有少數的同事接觸它,所以你不需要所有人都理解它。我們只暴露 API,然后它就工作了。這就好比一座橋,我們在這端用 RxJava,然后另外一端是普通的大陸。這是有局限性的,雖然它是我們應用的核心。

來自:realm

 本文由用戶 ChesterTeg 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!