Architecting Android with RxJava
最近,我抽出了幾個晚上的時間,把咖啡和啤酒變成了代碼與文字。
引子
三個月以來,我翻譯了一些關于 RxJava 的文章,說實話這些翻譯,真的搞得我很頭疼,那么現在是時候回來寫點什么了。
最近,我在看兩本書, 《Learning Reactive Programming with Java 8》 , 《RxJava Essentials》 ,不過,沒關系,我已經買到了電子版,我會在文章結尾附上網盤鏈接和密碼,但我還是希望你將文章繼續讀下去,因為那是文章結尾的事。
其實關于 RxJava 的文章和消息遠不止我們能了解到的,但又拜英語所賜,所以它看起來又沒那么多。好在,國內有許多優秀的開發專家 hi大頭鬼hi , BlackSwift , 程序亦非猿 , Drakeet ,扔物線,流火楓林等等在為之做著貢獻,以及簡直不能更優秀的文章 《給 Android 開發者的 RxJava 詳解》 。
但是,現在,我不得不再次做啰嗦一下, RxJava 究竟會改變我們什么。
響應式編程Reactive Programming
什么是響應式編程呢?在Java程序中:
int a = 4; int b = 5; int c = a + b; System.out.println(c); // 9 a = 6; System.out.println(c); // 9 again, but if 'c' was tracking the changes of 'a' and 'b', // it would've been 6 + 5 = 11
當我們改變“a”和“b”的值時,“c”并沒有改變。話句話說就是,“a”和“b”的改變并沒有響應到“c”。總結起來就是: 程序以流的形式,傳遞數據的改變 。
那我,我們又為什么需要響應式呢?
以下翻譯自《Learning Reactive Programming with Java 8》
10-15年前,對于網站開發來說,最平常的日常工作就是進行維護和縮短響應時間,那么今天,一切程序都應該保證七天二十四小時不間斷運行,并 且能夠極快的做出響應;如果你的網站響應慢或者宕機,那么用戶將會對你們真愛一秒變備胎,轉而選擇其他網站服務。當今的慢意味著不可用甚至是有故障的。如 今的互聯網是在和大數據打交道,所以我們需要快速的處理數據。
過去的幾年中HTTP錯誤已經不是什么新鮮事了,但是現在,我們不得不進行容錯機制,還要提供用戶易讀以及合理的消息更新。
在過去,我們寫簡單的桌面應用,但如今我們寫能夠做出快速的響應Web應用。多數情況下,這些應用要與大量的遠程服務器進行數據傳遞。
如果我們想讓自己的軟件保持競爭性,就不得不實現這些新需求,所以,換言之就是我們應該這樣做:
- 模塊的/動態的:用這種方式,我們就能夠擁有一個七天二十四小時的系統了,因為這些模塊能夠在不停止整個系統的情況下進行脫機和聯機。另外,隨著系統的不斷龐大,還能幫助我們更好地組織應用結構,同時還能管理底層代碼。
- 可擴展的:用這種方式,我們就能夠處理大量的數據和用戶請求了。
- 容錯性:用這種方式,能夠為用戶提供穩定的系統。
- 響應式:這不僅意味著快速,還意味著可用性強。
讓我們思考如何實現它:
- 如果我們的系統是事件驅動型的,那就把它模塊化。我們可以將系統分成多個彼此之間通過通知進行交互的微服務/組件/模塊。這樣,我們就能夠以通知為代表,響應系統的數據流了。
- 可擴展意味著能夠應對日益增長的數據,在負載的情況下不會崩潰。
- 對故障/錯誤做出及時的響應,能夠提高系統的容錯性。
- 響應意味著對能夠對用戶操作及時的做出反應。
如果應用是事件驅動型的,那么,它就能夠解耦成多個自包含組件。這能夠幫我們更好的實現擴展性,因為我們總是可以在不停掉或者打斷系統的情況下 添加新組建或者移除舊組件。如果錯誤和故障傳遞給正確的組件,把它們當做通知來處理并作出響應,那么應用能變得更具有容錯性和彈性。所以,如果把系統構建 成事件驅動型的。我們可以更容易的實現擴展性和容錯性,而且一個具有擴展性,低耦合和防錯的應用能夠快速的響應用戶操作。
Reactive Manifesto 文檔定義了我們剛剛提到的四點響應式準則。每一個響應式系統都應該是消息驅動型(事件驅動型)的。這樣它不僅能變得低耦合,而且擴展性和容錯性將更高,這就意味著它可靠和具有響應式。
要注意的是, Reactive Manifesto 只是講述了一個響應式系統,并不是對響應式編程的定義。當然,你也可以不使用任何響應式類庫或者語言,打造一款彈性可擴展,具有消息驅動的響應式應用。
應用程序中數據的變化,以通知的方式傳遞給正確的Handler。所以,使用響應式寫應用
回調地獄
如果你是一個能夠時刻保持頭腦清醒,邏輯清晰和思維縝密的人,是個Callback高手,善用并且能夠用好FutureTask。
那么在Android中你的代碼可能會頻繁的使用async+callbacks,或者service composition+error handing,developer productivity。
那么關于異步回調的邏輯,你會寫成這樣getData(Callback<T>)、這樣Future<T> getData(),還是這樣Future<List<T>> getData(),甚至這樣Future<List<Future<T>>> getData(),嗷!拜托,我簡直不能再舉例下去了,這簡直就是 Callback Hell ,這樣的程序或許寫起來很舒服,但是如何測試和維護呢。
如果哪天你的程序出了問題而必須馬上修復,但你能馬上趕來或者需要別人協助(這在很多公司是很常見的),或者他人在review你的代碼,那么,是時候拿出這張圖了。
然而使用 RxJava 的操作符,我們可以避免這些煩人甚至糟糕的回調,讓結構和思路看起來更清晰,通過組合API,只需要約定最終的結果Observable<T>就行了。
并且scheduler的出現,不僅解放了線程的切換,讓UI線程與工作線程間的跳轉變得簡單,而且,它的API很豐,也提供了很多使用常見的建議,比如,適用計算任務的 Schedulers.computation(?);處理密集IO任務的Schedulers.io(?);以及 Schedulers.trampoline(?)能夠有效避免StackOverflowError,所以非常適合函數的遞歸調用。好了,我不再舉例 了,因為 官方文檔 已經給出了很詳細的解釋了,但是值得一提的是,如果使用Schedulers的工廠方法創建的Worker,一旦任務執行完畢,都應該調用worker.unsubscribe( )方法,然后轉向之前定義的Scheduler實例上來。
當然 RxJava 的出現并不僅僅是為了解決回調地獄的。
這是我通過學習和不斷地練習,一路走來很辛苦,總結的一些經驗,分享給大家:
1 . error handling
3 . caching (roation)
@Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setRetainInstance(true); /*.cache()操作符: 當第一個subscribe訂閱的時候,才會連接原始Observable,緩存事件,重發給后續訂閱的subscribe 值得注意的事,它和使用了.replay().publish()操作符的ConnectableObservable的不同。 另外,為了避免內存開銷,不建議緩存大量事件*/ cacheObservable = weatherManager.getWeather().cache(); } @Override public void onViewCreated(View view, Bundle savedInstanceState) { super.onViewCreated(view, savedInstanceState); cacheObservable.subscribe(/*your subscribe*/); }
4 . composing multiple calls
5 . more robust interface than asyncTask
6 . easy to do complex threading
7 . functional nature is more expressive
/*一個數組,每個元素乘以2,然后篩選小于10的元素,放入集合中*/ Integer[] integers = { 0, 1, 2, 3, 4, 5 }; /*一般寫法,看上去并不是那么的“函數”*/ Integer[] doubles = new Integer[integers.length]; for (int i = 0; i < integers.length; i++) { doubles[i] = integers[i] * 2; } List<Integer> integerList = new ArrayList<>(doubles.length); for (Integer integer : doubles) { if (integer < 10) integerList.add(integer); } /*Observable寫法,一切都好多了*/ List<Integer> funactionalList = Observable.from(integers).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { return integer * 2; } }).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer < 10; } }).toList().toBlocking().first();
9 . fluent API
10 . easy debugging
//值得一提的是,關于@RxLogSubscriber要放在繼承自Subscriber的類上 @RxLogSubscriber class MySubscriber extends Subscriber<Void> { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Void aVoid) { } } //而不是實現Observer接口的類上 @RxLogSubscriber class MySubscriber implements Observer<Void> { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Void aVoid) { } }
當然,隨著學習的深入,你會發現,收益不止如此。
在響應式編程中,應該牢記以下兩點:
-
everything is a stream(一切皆流)
-
don't break the chain(不要打斷鏈式結構)
談談Backpressure
Android這種嵌入式系統,尤其是生產者-消費者( producer-consumer )模式中,一定要小心Backpressure(背壓,反壓)的出現。一個寬泛的解釋就是:事件產生的速度比消費快。一旦發生 overproducing,當你的鏈式結構不能承受數據壓力的時候,就會拋出MissingBackpressureException異常。
在Android中最容易出現的Backpressure就是連續快速點擊跳轉界面、數據庫查詢、鍵盤輸入,甚至聯網等操作都有可能造成 Backpressure,可能有些情況并不會導致程序崩潰,但是會造成一些我們不想見到的小麻煩。那么一起來看看如何用RxJava解決 Backpressure ,OK,讓我們的程序變得健壯起來吧。
groupBy操作符
在寫這篇文章的時候,剛好看到一段代碼,看來有必要說一說這個操作符了。
.groupBy( ) ,分組操作符,雖然目前這個項目中沒有用到,但是我還是蠻喜歡它的,而且我看到很多人在使用,將原始Observable根據不同的key分組成多個GroupedObservable,由原始Observable發射(原始Observable的泛型將變成這樣Observable<GroupedObservable<K, T>>),每一個GroupedObservable既是事件本身也是一個獨立的Observable,每一個GroupedObservable發射一組原始Observable的事件子集。
引用自: GroupBy中文翻譯
注意:groupBy將原始Observable分解為一個發射多個GroupedObservable的Observable,一旦有訂閱, 每個GroupedObservable就開始緩存數據。因此,如果你忽略這些GroupedObservable中的任何一個,這個緩存可能形成一個潛 在的內存泄露。因此,如果你不想觀察,也不要忽略GroupedObservable。你應該使用像take(0)這樣會丟棄自己的緩存的操作符。如果你 取消訂閱一個GroupedObservable,那個Observable將會終止。如果之后原始的Observable又發射了一個與這個 Observable的Key匹配的數據,groupBy將會為這個Key創建一個新的GroupedObservable。
那么問題恰恰出在 .take(n) 操作符上。
只返回前面指定的n項數據,然后發送完成通知,忽略后面的事件。
那么看一下這個例子:
Observable.just(0, 1, 2, 3, 4, 5).groupBy(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; } }).flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<Integer>>() { @Override public Observable<Integer> call(GroupedObservable<Boolean, Integer> groupedObservable) { return groupedObservable.getKey() ? groupedObservable.take(1) : groupedObservable; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer i) { System.out.println(i); } });
輸出結果:
然而在 1.0.0-RC5 之前的版本中,在GroupedObservable上使用.take(n)操作符將會在發送完n個事件后,對GroupedObservable進行 unsubscribe。并且GroupedObservable內部將會記錄這個unsubscribed狀態,然后忽略后面的事件。所以輸出結果將是 這樣的:
而在這之后的版本,使用.take(n)操作符,雖然也會發生unsubscribe,但是當原始再次Observable發送一個滿足key的事件后,將會重新創建一個GroupedObservable,然后發送這個GroupedObservable,不會發生之前那樣的,忽略后續事件的現象。
當然,不要忘記,對不感興趣的GroupedObservable使用.take(0),來避免泄露。
所以,我的建議是,在使用RxJava之前看看官方文檔或者change log。
關于RxWeather
我盡量減少對這個工程的文字描述。因為代碼才是最好的老師。
通過對 Android技術棧,1#架構 ( 譯文 )和 Android架構演化之路 ( 譯文 )的解讀和學習,按照架構和思路進行了實現,并且加入了RxBus。
關于REST API,我選擇了 和風天氣 ,而放棄了 Openweathermap 的理由如下:
-
Openweathermap免費用戶所在的服務器不穩定。
-
付費方面,和風天氣更經濟實惠。
但是和風天氣目前并不支持同時查詢多個地區的天氣預報,也不支持根據經緯度查詢天氣預報。但是以后的事情誰又能說的準呢?
由于應用并不支持動態的上拉加載。所以,所有的列表展示結果,取決于 city.txt 文件。
我從Openweathermap給出的 資源 (下載city.list.json)中,整理需要的城市Json字符串,整合了經緯度,以備不時之需。
找到了一個通過Location查詢所在地的API。
就這樣基本實現了列表展示頁 ListActivity 的功能:
-
根據Loaction查詢所在地城市名稱,然后查詢當地天氣。
-
讀取domain->assets->city.txt,然后依次查詢每個城市的天氣,所以,這個文件不建議放入太多json。
-
整合1和2的并發請求結果,顯示界面。
詳情頁 DetailActivity 通過 RxBus 發送黏性事件接收列表頁傳遞過來的數據,然后進行展示。這里會有七天內的天氣以及穿衣建議。由于我么并沒有找到一個正確的算法,所以當進入詳情頁后,旋轉 屏幕之后的退出動畫會有所不同。這個類涉及的代碼大部分都是動畫(注意Hardware Layer的使用)以及對屏幕旋轉的處理,所以代碼看起有點多。 ForkView 使用了一個簡單的自定義Behavior。
搜索界面 SearchActivity ,輸入的關鍵字請不要以市、區結尾,例如,北京而不是北京市,因為API不支持,我也沒辦法 :( 。
啟動頁
我認為,出彩的引導頁是對細節的重視,但是我實在不能忍受,在啟動頁等太久。注意:不要混淆這兩種場景。
所以,我在看了 正確使用啟動頁 之后,決定采取這種方式實現 SplashActivity 。而且不建議使用大圖,一個icon足以。
Code
All the code is available on Github .
片尾Tips:
文章開頭提到的資料,需要pdf或者kindle版本的請自行選擇下載,不得用于商業用途。
Learning Reactive Programming with Java 8 - pdf版,提取密碼:2d88。
Learning Reactive Programming with Java 8 - kindle版,提取密碼:5nec。
RxJava Essentials - pdf版,提取密碼:z3r8。
RxJava Essentials - kindle版,提取密碼:l67e。