RxJava 介紹

QFRPorfirio 8年前發布 | 10K 次閱讀 RxJava 安卓開發

RxJava 是一個了不起的 Android 工具,它使得代碼的編寫和維護更加容易,而且更少出錯,但是作為一個新的安卓開發者,卻不容易理解 RxJava 到底做了些什么事情使得編寫代碼更加容易了。在這篇 360|AnDev 演講中,我們會討論傳統的安卓開發方法,這些方法容易出錯的地方和 RxJava 是如何解決這些問題的。除了提及 Rxjava 的初衷,這篇演講也提供了一些常見的 RxJava 的使用實例,和一些第一次使用 RxJava 的常見陷阱。這篇演講不需要 Rx 基礎。

我的名字叫 Christina,我來向大家介紹 RX。在我們深入介紹之前,先做個警告,我覺得許多介紹的演講都流于形式,是這樣,這很簡單。使用這么簡單的一兩行代碼,你的世界就變得容易很多。我真的很想做這樣的一個演講。我也想這樣說。

但是不幸的是,RX 可不是這么簡單。RX 是復雜的,有趣的和有意思的。在許多方面都有意義,可不是簡單。所以,請容忍我,這里有許多內容。我將盡我所能使得大家容易理解,但是可能不是那么容易。它也不是一行代碼,我希望這不會太有關系。

背景

在我深入談論 RX 之前,讓我們看看目前的情況,因為如果目前的情況沒有問題的話,你也不需要解決方案。今天有兩件事情讓我覺得有趣,為什么我們需要異步操作?安卓對異步操作的支持是怎么樣的?

第一個問題很容易回答。因為用戶是我們的朋友,而且我們關心它,我希望他們能有好的體驗。如果你使用異步操作的話,你可以利用服務器的能力,那些因為移動設備的限制而不能實現的,需要真實的后臺進程才能處理的事情,就可以實現了。你可以給你的用戶提供美妙的體驗,而不會拖慢他們的速度,凍屏或者跳屏都不會出現了。

關于第二個問題,你有許多的工具可以使用。這里沒有辦法給你一個全集。我們沒有辦法討論所有的安卓中異步相關的東西,但是我們會涉及其中流行的部分。我可以肯定的是,你們之前一定聽過 Async Task,而且很可能使用過它,當然還有 Futures、Event busses和 Observables,這是我們今天要討論的內容。這當然不全面,所以我提供了一個更多信息的博客,以便你在演講后詳細地閱讀它。它討論了 Swift 和安卓的所有事情,基本上是所有的事情。

我想強調的是當我說 Futures 的時候,我會使用規范的形式來談及 Futures,我不是指的內嵌的 Futures。我指的是 ListenableFuture 。 對于內嵌版本的 Futures,有著許多錯誤的地方會迷惑你。這里,我將使用 Futures 來代表 ListenableFuture ,當然它的底層是 Futures,但是添加了許多有用的函數。

我們有許多工具,而且雖然我們有許多的工具,可最后只會使用一到兩個,這樣我們就需要評估這些工具。我不會站在這說我有最佳評估矩陣的衡量標準,但是我會提出我的觀點。我考慮的三個方面是:__何時運行,怎么運行和影響誰。__當我從這三個方面考慮問題的時候,它就會告訴我那些工具是合適的。

如果我們把 Async 作為例子,你可以注意到 AsyncTask 需要被顯式執行。這很棒,因為這意味著我們能夠控制它什么時候開始。這個工作在 AsyncTask 里面就不容易實現了,所以關于如何運行,是你激活了它,如果你想構建什么東西的話,你可以在一些回調函數里面實現它們。這不理想,但是把它們聚合在一起不容易。關于輸出,許多數據是一個 “副作用”。

如果你不熟悉 副作用 ,首先,這可是個很棒的感念。我建議你們去了解了解,其次,在醫學上,它有著必然的因果關系。如果我吃感冒藥的話,目標就是治愈我的感冒。如果它讓我變綠了,就是副作用。這不是目標,我不想變綠。

同樣的例子,如果你想做些事情,然后它就開始從網絡中讀取數據,或者它改變狀態,而且這都是不確定的,這就是副作用,因為你不能通過你的函數來知道到底會發生什么。

private class SomeTask extends
        AsyncTask<ParamsType, ProgressType, ResultType> {

    //some implementation here
    protected ResultType doInBackground(ParamsType params)

    protected void onPostExecute(ResultType result)
}

new SomeTask().execute(params)

如果你觀察這段代碼,你能看到最后一行是我們顯示執行的。你可以看到一個 doInBackground 函數,這里我們有些實現,然后當它執行完畢的時候, onPostExecute 會執行,但是你不確定你會如何從這里開始執行事情,或者是一旦你開始了,它們是如何改變事情的。

如果你有評估準則,你會有一些理想情況的假設和目標,否則的話,評估準則的意義是什么呢?我打算提供一些理想的情況,這是我們的目標。

首先,顯式執行。如果你能開始一些事情的話,你就能控制它。對于 Future 來說,這不總是正確的,當你創建它們的時候,它們就運行了。你希望盡可能的控制它們。在這個例子里面,它能幫你,因為你能創建它,在它運行之前控制它。

你還希望簡化線程管理。本次演講是關于異步行為的。線程管理是關鍵,如果你想簡單的話。你希望它們是能被轉化的,而且副作用越少越好。

我將會深入談到每一點,但是基礎是一樣的。顯示執行意味著你能控制這些可觀察者。你能創建它們,存儲它們,修改它們,然后在它們完美的時候,在它們得到它們需要的數據的時候,它們和你期望的一樣的時候,你可以馬上執行它,而不是過一會。強調一遍,關于線程管理,這是異步操作。如果你不能容易地切換線程,或者理解你在哪個線程,這就會很糟,因為你會因此而出錯。

我們也想容易點,因為它易讀,這對寫代碼的時候很有用。關于簡單組合,異步工作能獨立完成就好了。當我們執行的時候,網絡操作不依賴任何東西,我們不需要在網絡返回時做任何事情,我們做了,然后忘記它,這樣很棒。但是現實不是這樣,而且事情常常是交互的,因為它們是互相依賴的,所以它們希望是協同工作的。如果你使用緩存,然后網絡,你不希望它們之間還有中間步驟。你希望它們是串起來且具有意義的。如果你的異步庫是可組合的,它就不容易出錯,這是件好事情,這樣我們就勝利了。

最后,最小化副作用。副作用越小,你的代碼就越經得起質疑,你的代碼就越易讀,其他人也容易閱讀你的代碼,以為它不依賴于狀態,它是確定的,而且解釋起來也很容易。

Rx 

我設置了這些標準。你可能知道事情該如何發展了。我個人覺得 RX 能滿足這些條件,讓我來告訴你原因。RX 是顯式執行的。你如果想讓一些事情發生的話,你需要注冊。在那之前,你都是在構建或者創造或者存儲,事情沒有被啟動。

使用 RX 非常容易理解你是在哪個線程上執行的,因為那里有單行操作符 subscribeOn 和 observeOn ,這里你可以配置線程。如果你看看代碼,你可以看到 observeOn IO 線程,這是發生在 IO 上的。它們很容易改變,過濾,對應和其他各種各樣的操作。這是它真正閃光的地方。消除副作用,這也很棒。這是異步的工作,所以它們從來都沒有擺脫他們。你在使用網絡,你在改變定義的狀態,但是它做了非常好的嘗試來減少它們的影響。

關于 RX 的快速的事實。首先,RX 是為異步事件創建的。也是專門為聚合構建的。其次, RX 是 Reactive Extensions 。再次,RX 支持許多平臺。我今天的演講是關于安卓__RxJava__,你也可以在 JavaScript 里使用它。也可以在 Swift 里面使用。你能在不同的語言里面使用它。

RX 的三個關鍵點是現在在屏幕上的三點。RX 有可觀察者,有 LINQ 查詢,有調度器。LINQ 查詢是基于這些概念的,雖然它不是顯式使用的。在可觀察者、查詢和調度者之間,我們有了 RX 的世界。

深入理解下,可觀察者代表著異步數據流。這對于剛剛接觸 RX 的人來說常常非常難以理解。所以我會晚點深入談談。但是現在,就先假設它們是流。查詢允許你能在這些流上面執行操作,調度,而且不出意外地,你能控制這些流的一致性。

回顧一下:這些是 RX 的三個核心部分。我們能表示異步數據流,我們能查詢,我們能用操作合并這些流,而且我們能用調度來協同這些流。

Observable

回到我們的第一部分:Observable。Observable,如我所說,是數據流。它們是拉數據的方式,需要注意的是你可以采用 RX 里面的高級功能來實現推送數據模式,但是一般情況下,它都是拉數據的。你能夠創建它,存儲它,然后傳遞它們,等等。因為它們是顯式執行的。最后一點是我特別想強調。

Observable 能幫助我們隱藏所有線程和同步的復雜細節。這不是說復雜性不存在了,只是庫幫你解決了這些問題,而且我們希望它們處理的很好,因為它們是全職在做這些事情。RX 常常非常困難,就是因為這些流和 Observable 的概念。

我昨天晚上和一些人聊天。我問他們如何理解流,這對他們來說意味著什么,有人說 類比工廠 是它真正有意義的地方。我把這個觀點拋出來。希望這能幫助到一些人。如果不行,就把他們當作是流,希望這樣就能理解了。

類比工廠意味著你開始有一些原材料。比如你在生產汽車,你的工廠來了鋼鐵。這是原材料,這就是我們的網絡請求。我們在某處開始,然后一旦你有了原材料,你把它們送給傳送帶。你可能會把它壓成金屬板,或者壓成汽車的形狀,或者做些類似的事情。你可以也需要引入其他工廠的一些資源。也許你需要一些橡膠來做輪胎。當你在這些傳送帶上執行的時候,你構建它,增加東西,最終當汽車出現在門外的時候,你有了最終的產品,在我們的例子里面,就是從流里面獲取了數據。

所以我們放進一些東西,一些原材料,我們用不同的方法構建它,然后我們得到我們想要的產品。

兩個關鍵的可觀察者的生存周期階段,我這么叫它,就是你想往里放數據和你想往外獲取數據的時候。你還有其他想要流的原因嗎?就是數據。看看第一部分,放入數據。你有許多不同的方法能夠實現,我會展示一些。最簡單的就是 observable.just 。

Observable.just("Hello World!")

這是一個串,我只想要這個,所以我放了進去。

val names: Array<String> =
    arrayOf("Christina", "Nicole", "Alison")

//Will output Christina --> Nicole --> Alison --> X
Observable.from(names)

第二部分,你能在 Observable 上使用迭代器。這里我有一個我的一些朋友名字的數組:Christina, Nicole,和 Alison。我可以說 Observable.from 這個可迭代對象,然后從這個可觀察者中返回的就是 Christina, Nicole, Alison,然后就結束了。

String[] names = {"Christina", "Nicole", "Alison"

//Will output: -> Christina -> Nicole -> Alison -> x
Observable.from(names)

這個創建過程最重要的部分就是使用 .create 。

Observable.create<String> { s ->
    s.onNext("I created an observable!")
    s.onCompleted()
}

這里你可以發現發生了些什么,我使用注冊 S ,我特別地管理了設置的東西。在這個例子里面,我提到了注冊,在 onNext 里發送我在 Observable 里面的字符串。我不想在這里放置更多的字符串,所以我讓它告訴你完成了。然后我說 注冊,你結束了 。這是一個 Observable。任何事情都沒有發生,但是當我創建一個的時候,這個的行為是,它發出字符串,然后結束。

Observable.create(new OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber)
        subscriber.onNext("I created an Observable!"
        subscriber.onCompleted();
    }
})

你可能注意到這和 Java 不一樣。這是使用__Kotlin__完成的代碼。我將使用 Kotlin,這樣我能保持代碼的一致性,但是我將盡可能地解釋它。

還有一個 onComplete ,這有一點瘋狂,因為我們實現了所有的這些 onNext ,我們使用的是數組,然后突然一下,就有了 onComplete 。 結束 意味著什么?這甚至不是放入數據,那是拿出數據,你是對的。這就是一個從可觀察者里拿出數據的機會,如果你不理解在一個流的結束的位置,你是如何接收數據的,那么你也不會理解是如何放入數據的。

我們從流里還需要什么?主要是數據。我們想有一個方法說 嘿,下部分數據是什么? 我使用了網絡,我能得到什么?我們也想知道,還有剩下的數據嗎?因為如果沒有了,我們就結束了,我將會繼續,我將會回收資源。我們真的想知道:我還需要關心這個流嗎?或者我能繼續嗎?

最后,因為現實世界不是完美的 (我們存在的痛苦),有錯誤發生嗎,我需要知道嗎?代碼里面看起來是怎樣的?我想要下一個數值的時候,有 onNext 。我知道當有事情結束的時候,有 onComplete ,而且我知道當有事情出錯的時候,有 onError 。

希望這些名字是有意義的。它們是可讀的。這是 RX 讓我我最喜歡的原因之一。這里沒有什么令人吃驚的地方。下一個值, onNext 。結束 onComplete 。錯誤, onError 。 ```java

Observable.create { s -> val expensiveThing = doExpensiveComputationHere() s.onNext(expensiveThing)

val otherExpensiveThing = doOtherExpensiveComputation()
s.onNext(otherExpensiveThing)

//all done
s.onComplete() }
這些在實際工作中會是如何?重申一次,我們有頂層的函數 `create`,所以我們說 `observable.create`。在這個例子里面,我用 `string` 類型,所以這是一個可觀察的對象,而且生產一堆不同的字符串。你可以,你知道,在這里放上一個不同的類型。

而且,讀這些 Kotlin 代碼就像在讀偽代碼,你有一些注冊,然后我們有一個開銷很大的函數。也許它會從磁盤讀取內容或者其它事情。讓我繼續,然后在有數值返回的時候再處理。一旦我有數值了,讓我通過調用 `onNext` 把它放到那個流里面。我們調用這個函數,得到數值,然后我們說,`s.onNext` 那個值。也許我們還沒完成,也許現在我想訪問網絡,或者我想做另外一個磁盤讀取,或者誰知道要發生什么。這里有些其他開銷嚴重的函數。我會調用它,然后我會獲取值,我能使用 `onNext` 把它推送到流上。

也許我只有那兩個開銷大的事情,也許我有更多的。在這個例子里面,我們在這兩個調用后就結束了,所以我會告訴流,結束了,而且我會說 `onComplete`。再詳細點解釋每個步驟,你可以如你所愿多點或少點調用 `onNext`。現在,我的屏幕上有一個星號,因為它不是一個完整的真實的語句。而且在那種情況下,我下面會談到一些性能問題,如果你太頻繁的這樣使用的話,你可能會遇到這些問題。作為一個 RX 的介紹,你可能不會遇到它,但是這是要注意的地方,如果你真的有繁重的處理的話,你可能會遇到性能問題,而且你想小心地面對它,然后正確地處理它。

一個關于 `onNext` 的小測驗。我真的想確保每個人都理解了。有人能告訴我這個流看起來是什么樣子嗎?這不是一個困難的問題。它真的是和你想的一樣簡單。兩個和三個,準確地說,就是這!它放入了兩個,放入三個,然后結束了!類似的,輸出也一樣。這開起來像 *蘋果,香蕉,橘子,菠蘿*,然后結束了。回家后嘗試著這樣做,你也能做到的。你可以實現一些二進制數。它將會看起來一樣。你會一個個地輸出,然后流就會關閉。我感覺關于 `onNext` 有許多的迷思。它真的就是這么簡單。

談到 `onError`。這真的是關鍵了,這是我將會詳細講到的一點。如果你需要了解一件事的話,我希望就是這件,因為它會省去你許多的麻煩。`onError` 和 `onComplete` 都結束流。它們都是流的結束事件。唯一的不同就是當你調用 `onError`,流正在結束因為有些事情出錯了;當你調用 `onComplete`,流正在結束因為你結束了你的工作,這時不需要做任何事情。但是兩者都會結束流,不同的地方就是流結束的原因。

一個 RX 的 `onError` 偉大的地方是所有的錯誤都在一個地方處理。你可以綁定一組不同的可觀察者,出現的任何錯誤都會出現在你的 `onError` 里面。如果它發生在上游,如果它發生在下游,它都會來到你的 `onError` 里面。這和 Futures 里面的東西不同,Futures 有獨立的成功和失敗的回調,然后也許你需要告訴另外一個 Future 這個 Futures 的錯誤,那么你該如何傳播這個錯誤呢,然后巴拉,巴拉,巴拉。這個錯誤會傳播回到最底層,所有的錯誤都在那里。如果你想找錯誤,它就在那里。看這里,如果你想要錯誤的話。這是它的要點。

```java

Observable.create<String> { s ->
    try {
        val result = functionThatMightError()
        s.onNext(result)
    } catch (e: Error) {
        s.onError(e)
    }
    s.onCompleted()
}

回到終止事件的地方,這里有個例子,我想逐步地通讀一遍來看看發生了什么。我有一個 Observable,我們自己創建的。我們在這里放了許多的 try-catch,因為這里有個函數會出錯。我不知道它做了些什么,也許是訪問網絡,但是它會出錯。所以,在這個例子中,我們假設他工作正常,我們調用它,然后我們調用 onNext ,傳入參數,然后我們執行 onComplete 。但是這不是函數 errors 的情況。

Observable.create<String> { s ->
    try {
        val result = functionThatMightError()
        s.onNext(result)
    } catch (e: Error) {
        s.onError(e)
    }
    s.onCompleted() //<--NOPE
}

這和前面的 Observable 創建一樣,但是這次讓我們假設那個函數出錯 一定出錯 。你可能希望它會馬上調用 onNext ,然后 onError ,然后 onComplete 。但是記住, onError 是流的終止事件,所以你會得到 onError 。這是我們不期望的,所以停一停,讓我們想想。在 onError 之后,流結束了,沒有任何地方可以調用 onComplete ,因為那是流成功的結束。我們已經是不成功的結束了。 onComplete 和你想的一樣。它告訴你流結束了。繼續執行而且為你自己清理現場吧,回收你持有的任何東西然后繼續。

回顧:我們有我們的 onNext ,我們有我們的 onComplete ,我們有 onError 。每一個函數都完成你希望它們完成的功能:傳播下個數據片,告訴你流已經結束了,或者告訴你流的一部分有一個錯誤,就是這樣。

如果你記得我在 LINQ 查詢討論的東西的話,關鍵點是:Observable 和許多異步方法的顯著不同是,它們在代碼中的組合的時候和各種轉換的時候都是非常有用的。

操作符

現在我們來談談操作符。我們有許多的操作符。我不可能向你們介紹所有的操作符。你有過濾的操作符,采用一些元素然后跳過一些元素的操作符,映射的操作符,水平映射的操作符,你能想到的任何東西,都有操作符的存在。它們在實戰中看起來如何?

Observable.from([1,2,3,4]).map { num ->
    num + 1
}

Output: 2 --> 3 --> 4 --> 5

如果你看看這個,數組的 observable.from 和我們在之前的演講里面看到的一樣。如果你也有這樣的 Observable,它可以發射數字 1,2,3,4 然后它會停止。

Integer[] numbers = {1, 2, 3, 4};
Observable<Integer> obs = Observable.from(numbers).map(
    new Func1<Integer, Integer>() {
        @Override
        public Integer call(Integer integer) {
            return integer + 1;
        }
    }
);

//Outputs: 2 --> 3 --> 4 --> 5 --> x

這里調用它的 onComplete 。雖然這里我們增加了額外的異步,就是 map 。這個 map 接收一個變量,所以函數有一個給定的參數叫做 number。我們想做的事情就是我們想說的事情,返回數字加一。一進來,我們有一了,我們說,但是我們不想要一了,我們想要一加一,所以二被輸出了。然后二進入流,我們有了二加一,所以是三。然后突然地,一個 Observable 可以輸出 1,2,3,4,現在輸出2,3,4,5。

Observable.from([1,2,3,4]).filter { num ->
    num % 2 == 0
}

Output: 2 --> 4

我們也可以用 filter . 這里,我們有一個數據,然后返回一個布爾表達式,如果數字模二為零的話返回真。如果不是的話,就不給流返回。這里模二是一,所以不能繼續。二模二是零,所以可以繼續。這樣我們有輸出 1,2,3,4;然后通過過濾,變成 2,4。

Integer[] numbers = {1, 2, 3, 4};
Observable<Integer> obs = Observable.from(numbers).filter(
    new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer % 2 == 0;
        }
    }
);

//Output: 2 --> 4 --> x

這只是一個 “給我事件” 的數字函數。我們也能做些事情,比如組合和合并 Observable。

val schoolFriendsObs =
    Observable.from(arrayOf("Mo", "Dave"))
val workFriendsObs =
    Observable.from(arrayOf("Nicole", "Alison"))

val allFriendsObs = Observable.merge(
    schoolFriendsObs,
    workFriendsObs
)

我沒有那么多朋友。他們都在這里了。你可以看到第一個數組是我學校的朋友。我有 Mo 和 Dave。太棒了。然后我有些工作的朋友。我有 Nicole 和 Alison。也有一個關于他們的 Observable。然后在底部,我組合了我所有的朋友。讓我從合并過后的學校和工作的朋友中創建一個 Observable。這個輸出看起來像這樣。

//Note: Output is only in the same
//order because input was synchronous
I/RxKotlinHelper: onNext(Mo)
I/RxKotlinHelper: onNext(Dave)
I/RxKotlinHelper: onNext(Nicole)
I/RxKotlinHelper: onNext(Alison)
I/RxKotlinHelper: onComplete()

我們有 Mo,我們有 Dave,我們有 Nicile, 我們有 Alision,然后結束了。

Scheduler

所有 Observable 的元素都在這個輸出里面了,然后我們結束了,因為沒有剩下任何東西了。現在,我給你們留下一個筆記。這里的數據是順序的,是因為它們是字符串的名字,所以它們是 同步的 。如果你在處理網絡,那么順序可能會是 Nicole, Mo, Dave, 然后 Alison。它們不一定像原來的順序。

繼續最后一部分,Scheduler。這也是有意思的地方,因為我們談到異步工作。如果你不注冊,什么都不會方式。重申一次,你能看到星號是因為我騙了你。這里有個警告。高級性能問題。你現在不需要理解它,但是當你真正需要考慮這個問題的時候,是有許多值得思考的地方的。

但在我們的上下文中,在 Java 結束的上下文中,如果你不注冊,什么都不會發生。我之前提到 observable.create 的地方,然后我放置了輸出,那不是完全正確的。那是這些可觀察者能夠輸出的內容,但是它們沒有真正地輸出,因為我從來都沒有說過在它們身上 注冊 。我創建了一個對象,然后如果你在它上面注冊的話,它會輸出那些東西。

關于注冊者的一些東西。它們能接受不同數量的函數。它也可以不接受函數,如果你只想啟動它,而不關心返回的是什么的話。它們能接受三個函數, onNext , onComplete ,和 onError 。最大三個,最少零個。

我將停在這里,然后強烈建議你總是傳遞 最少 一個 onError ,因為如果你不這樣做,你的堆棧將完全不可讀,你會恨你自己的。請,最少,傳入 onError 。這很重要,如果有什么事情出錯了,你能理解哪里出了問題。

然后,關于工作該在哪里執行,我們有兩個不同的操作符,它們是 subscribeOn 和 observeOn ,我將使用這兩個操作符。請知曉,這是我們管理合適開始的方式。

代碼看起來像什么樣子呢?

val names = arrayOf("Christina", "Nicole", "Alison"

Observable.from(names).subscribe(
    { next ->
        Log.i(TAG, "onNext($next)")
    }
)

我有一個同樣名字的數組。調用 observable.from 這個數組,然后最后一行是新的代碼。我說 .subscribe ,然后在這個例子里面,我只傳入了一個函數。我傳入了 onNext 函數,然后那行代碼說,下一個我能得到的數據片,打印它。就是這樣。

輸出看起來像這樣:

I/RxKotlinHelper: onNext(Christina)
I/RxKotlinHelper: onNext(Nicole)
I/RxKotlinHelper: onNext(Alison)

它打印了名字,然后什么都沒有發生。現在,如果我們想更嚴格,我們可以傳入三個函數。

val names = arrayOf("Christina", "Nicole", "Alison"

Observable.from(names)
    .subscribe(
        { next ->
            Log.i(TAG, "onNext($next)")
        },
        { error ->
            Log.i(TAG, "onError($error)")
        },
        {
            Log.i(TAG, "onCompleted()")

這是個完整的函數集合。我們傳入 onNext ,這是第一個函數。我們傳入 onError ,打印錯誤,然后最后一個函數是 onComplete ,我們將打印 onComplete。這次輸出看起來很熟悉。

I/RxKotlinHelper: onNext(Christina)
I/RxKotlinHelper: onNext(Nicole)
I/RxKotlinHelper: onNext(Alison)
I/RxKotlinHelper: onCompleted()

雖然我們增加了一行,就是 onNext ,所以和以前一樣,但是現在我們增加了 onComplete ,所以當結束事件發生的時候,我們有一個輸出。這就是最底下的 onCompleted 。

我目前為止忽略的是 .subscribe 會返回一個注冊。你可能不需要用到這個注冊,但是它應該是有一個注冊的引用的,這不奇怪。你可以在它上面調用 unsubscribe ,然后就會有這樣的東西:

val sub: Subscription = SomeObs.subscribe{ next ->
    Log.i(TAG, "onNext($next)")
}
sub.unsubscribe()

我在一些 Observable 上面調用 subscribe ,這是我上一個幻燈片上面創建的東西,然后我打印了 onNext 里面發生的事情。它會再一次打印名字或者其它東西,然后如果你不關心它是怎么工作的話,我也不愿意監聽它了,我能夠調用 unsubscribe ,因為我有這個注冊的引用。你不需要引用,但是在你自己不需要的時候釋放你曾經 subscribeOn 的東西,是個很好的習慣。

這就是 Observable 和所有注冊過程的內容。那么線程呢?我的工作在哪里運行呢?

回憶一下,我們有這兩個操作符,我們能使用它們來告訴 Observable 它們應該從哪里開始工作。 subscribeOn 只應該使用一次。你可以不止一次地加它,但是不會起作用。 subscribeOn 會在你創建可觀察者的最近的位置起作用。如果你在主線程的第一行調用 subscribeOn ,那么 10 行代碼后,你調用了 IO 線程,它會在主線程上注冊。它會忽略接下來發生的事情。你只能使用一次,就這。如果你不顯式說明它應該從何注冊的話,它就會發生在線程上。而且默認的是你創建可觀察者的線程。這常常是主線程,但是如果你在計算線程或者其它線程上創建可觀察者的話,你需要把 subscribeOn 放到你確定能運行的地方,因為它默認會在計算線程,這是它運行的地方。

關于 subscribeOn 的關鍵點是你代碼的第一部分總是在你的 subscribeOn 上執行的。在你注冊的那點上,代碼會開始在那個線程里執行。也許這行代碼之下,你在做其他的事情,然后你先有一個 observeOn 來改變線程,這沒問題。但是 subscribeOn 告訴你你的代碼從哪里開始執行。第一塊運行的代碼總會執行 subscribeOn 直到你有機會改變它,如果你這樣選擇的話。

這會把我們導向 observeOn 。也許你已經在主線程上注冊了,也許它是一個計算線程,但是現在你想顯示些東西,放在 UI 上,所以你想切換回主線程。這是 observeOn 能幫上忙的地方。你可以無限次地這樣使用。

這里有個可怕的警告。這會有些壓力的問題。重申一次,如果你不是真的在處理性能要求嚴格的事情,你可能不會這樣做,但是如果你是,這樣做可以。

理解 observeOn 影響接下來的所有事情很重要。如果我在一個 10 行 Observable 鏈條上的第 2 行放上一個 Observable 的話,它會從第 3 行開始影響起。除非我在第 6 行再放一個 Observable,或者 observeOn 在第 6 行,在這個例子中,Observable 現在會作用于剩下的所有事情了。每一個你的 observeOn 下面的東西都會在你告訴 observeOn 的任何線程里發生。我們看看代碼。

注冊,監聽

我將會逐行解釋。

Observable.from(arrayOf("Red", "Orange", "Blue"))
    .doOnNext { color ->
        Log.i(TAG, "Color $color pushed through
                on ${Thread.currentThread()}")
    }.observeOn(Schedulers.io()).map { color ->
        color.length
    }.subscribe { length ->
        Log.i(TAG, "Length $length being recieved
            on ${Thread.currentThread()}")
    }

我們開始于一個 Observable。你已經看過千百萬次了。他將會輸出一組顏色。就是這樣。現在我將要增加一個 doOnNext 。你還沒有看到它,但是 doOnNext 是可以自解釋的。當 onNext 發生的時候,我將執行我提供的這個函數。在這個例子中, 我提供的函數就是打印日志。我想把它在執行的線程打印出來。這就是這里發生的內容。在 onNext 發生的時候打印線程。

現在,也許其他的事情會發生了,我不想再在這個線程里了,所以我放置了一個 observeOn ,然后我說,讓我們切換到 IO 線程吧。我會做些事情比如映射它。我有一個顏色的字符串。也許我只關心字符串的長度。這很重要。我會說:接收這個串,然后給我這個串的長度而不是串本身。然后你就完成所有的產品了。我增加了 subscribe 然后它打印出我最后得到我的輸出的線程。輸出看起來是怎么樣的?

I/Rx: Color Red pushed through on Thread[main,5,main]
I/Rx: Color Orange pushed through on Thread[main,
I/Rx: Color Blue pushed through on Thread[main,5

I/Rx: Length 3 being
    recieved on Thread[RxIoScheduler-2,5,main]
I/Rx: Length 6 being
    recieved on Thread[RxIoScheduler-2,5,main]
I/Rx: Length 4 being
    recieved on Thread[RxIoScheduler-2,5,main]

它從主線程開始,因為我沒有給它一個 subscribeOn ,我在我的例子應用的主線程里面編寫了這個 Observable。在我切換到 IO 線程的時候,不出意外的,它開始在 IO 線程上輸出內容。你可以看到最下面的三行代碼,它說我們在 IO 調度器上。現在,讓我來使用一個顯示的 subscribeOn 。

Observable.from(arrayOf("Red", "Orange", "Blue"))
    .doOnNext { color ->
        Log.i(TAG, "Color $color pushed through
            on ${Thread.currentThread()}")
    }.observeOn(Schedulers.io())
    .map { color -> color.length }
    .subscribeOn(Schedulers.computation())
    .subscribe { length ->
        Log.i(TAG, "Length $length being recieved
            on ${Thread.currentThread()}")
}

我不想在主線程上運行這個事情。這還是那個我們一行行建立起來的 Observable,但是有一個 subscribeOn 調用,我把它放到計算線程中去。它會怎么改變事情的運行呢?

//Output trimmed to fit

I/Rx: Red pushed on Thread[RxComputationScheduler]
I/Rx: Length 3 recieved on Thread[RxIoScheduler]

I/Rx: Orange pushed on Thread[RxComputationScheduler]
I/Rx: Length 6 recieved on Thread[RxIoScheduler]

I/Rx: Blue pushed on Thread[RxComputationScheduler]
I/Rx: Length 4 recieved on Thread[RxIoScheduler]

現在你可以看到第一部分的工作做完了,它們是在計算調度器上做完的。在那之后,一旦我們切換到 IO 線程上,我們把它們映射到鏈接上,這會在 IO 調度器上完成。這是一個你能看到的例子,我們能在一個地方開始工作,我們能切換到不同的調度器上,而且它能在其他的地方完成。我還能增加一個不同的 observeOn 來把它放回主線程,然后你就能在輸出里面看到交互運行了。

Rx 和 Android

RX 和 Android。我還沒有開始討論過 Android。我們講到了可觀察者和一般意義上的 RxJava。讓我們談談這怎么影響安卓開發者的生活。重申一次,這和運算符類似。我不可能告訴你們所有的 RxJava 在你們應用里面的應用方式,因為篇幅是有限的。你是不是應該使用它又是另外一個話題,但是你可以在不同的地方使用它。

這里有些有點麻煩的例子。你可以綁定點擊,然后過濾某種模式,或者你通過你的可觀察者的聚合器來判斷人們是不是在雙擊。你可以 FlatMap 網絡緩存命中。你可以在一個單一流里面處理鑒權的問題,所以你可以得到用戶輸入然后你發送給網絡,然后你得到返回值。也許第二次需要網絡,獲取用戶詳細信息。所有這些事情都發生在一個單一流里。

我將串講一些例子。在我這樣做之前,我想說,它們只是幫助你們看看在安卓應用里面是如何實現的。如果你不能理解它們,沒關系。這里只是給你們一些感覺,看看什么是可能的。

fun Button.debounce(length: Long, unit: TimeUnit) {
    setEnabled(false)

Observable.timer(length, unit)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        setEnabled(true)
    }
}

這里有個去抖按鈕的例子。我不確定你為什么要這樣做,但是你可以這樣做!這里發生的事情是,我給一個按鈕類增加了一個叫做 debounce 的東西。我可以給它一些時間單位,然后我開始 setEnabled 為 false 。我會設置一個時鐘觀察者。它會超時。當它超時的時候我設置 setEnabled 為 true 。

在 setEnabled 為 false 的時間里面,用戶不能點擊這個按鈕。然后在我的時鐘超時后, onNext 被調用, Enabled(false) 變為 true,然后用戶可以再次點擊了。

fun size(): Observable<Int> = Database.with(ctx)
    .load(DBType.photoDetails)
    .orderByTs(Database.SORT_ORDER.DESC)
    .limit(24)
    .map { it.size() }

我在我的照片分享應用里面使用過這個,我們在數據庫里面有許多照片。我們使用的一個場景是 - 這是個少見的例子 - 我會使用 context 來得到我的數據庫,然后我說:嘿,加載些照片的詳細信息吧。我真的只關心最近的照片。你能幫我按照時間順序排序嗎?然后,順便提一句,我不能一次處理許多照片,所以給我最后 24 張就好了。這是我真的關心的東西。然后也許我只關心照片的大小。我想在 UI 上做些東西,誰知道呢。然后最后一行說,好吧,不是照片的細節,給我照片的大小。映射成為大小。

這看起來很緊湊。如果沒有 RX,這會是 5 頁代碼。它會非常冗長。我不得不在代碼間跳來跳去。你能在 6 行代碼里完成這件事情是件非常了不起的事情。這是真的很復雜而且非常容易閱讀。

return Observable.create<Unit> { s ->
    val outputFile = writeOutputFile(mediaFile)

    when (type) {
        Type.Photo -> addPicToGallery(ctx, outputFile)
        Type.Video -> addVideoToGallery(ctx, outputFile)
        else -> {
            s.onError(Error("Unexpected download type!"))
    }
}
s.onNext(Unit)
s.onCompleted()

你可以對 IO 做這樣的規范示例。你可能下載一些東西。你有一些函數來寫文件。我知道那兒發生了什么,但是我得到了一個文件。也許我們需要檢查類型。如果是圖片,就放到另外的地方。如果是視頻,放到另一個地方。如果都不是,那發生什么呢?拋出一個錯誤。然后,在這個例子里面,我們不會通過流輸出數據,因為我們不關心。

我們常常使用這個可觀察者的地方,就是 toast 說 “下載完成” 或者 “下載失敗” 我們不需要從那個流里發送文件,因為沒有用。我們關心的是,這個過程完成了嗎,或者失敗了?

你會發現我們有 onError 。如果有錯誤,如果我們不知道文件類型,我們不知道怎么處理它,而且如果不知道,我們就只能調用 onNext 而沒有任何參數。 Unit 這里是 null 類型,所以它說調用 onNext ,但是我不會給你的 onNext 函數任何參數。它將會退出。然后,我們說,我們結束了。封裝它。你可以看到有一個 subscribeOn IO,所以你可以放置一個 subscribeOn 。

fun codeObservable(): Observable<String?> {
    val filter = IntentFilter(SmsUtility.INTENT_VERIF_CODE)
    return ContentObservable.fromLocalBroadcast(this
        .map { intent ->
            intent.getStringExtra(SmsUtility.KEY_VERIF_CODE)
    }
}

還有些方法可以從本地廣播里面提取信息。所有那些給你驗證碼的應用,它們給你短信驗證碼,然后你需要把它們輸入到你的應用中去,這是你認證的方式。安卓很偉大,是因為如果你讀他們的短信的話,你會嚇到用戶。這是個真實發生的例子。

有人給你發送一個驗證碼。你自動讀取短消息,抽出驗證碼然后把它放到 LocalBroadcast 中。但是 LocalBroadcast 會有點笨拙,所以讓它繼續,然后把它變成一個可觀察者。這里發生的是,我們有 IntentFilter 來獲取驗證碼,而且我們把它傳給 ContentObservable.fromLocalBroadcast 。

我們不需要 intent。這不是我們感興趣的地方。我們感興趣的就是驗證碼,所以我們肯定這是個正確的。最后一行說:你給我了一個 intent,但是我真正關心的就是驗證碼。把它抽取出來,然后你可以看到返回類型是 String? 。這意味著 String 會存在而且也可能是 null。如果那個關鍵值不在我的 intent 里面,它會返回一個 null。如果在,就會返回驗證碼,然后我可以做些進一步的驗證。

timedAuthObservable
    .observeOn(Schedulers.io())
    .flatMap { code ->
        userModel.sendVerifyResponse(code)
    }.flatMap {
        userModel.getSuggUsername().onErrorReturn {
    }.observeOn(AndroidSchedulers.mainThread())
    .subscribe({ suggestedUsername -->
        //update UI with suggested username
})

你可以構建些真的很復雜的首次流程。這是偽代碼,但是 timedAuthObservable 是這樣的情況,你進入輸入密碼的環節,看看它們是不是有效,所以你看到進度圈。但是也許你的網絡很差,所以它會出現連接失敗,你感覺很討厭。這是一個 timedAuthObservable 。

你可以在 IO 線程實現它,因為你需要網絡。然后也許你接下來想做些事情。什么事情不重要。我增加了一些偽代碼,正如你想驗證你的返回值一樣,現在你登錄了,你想獲取一個建議的用戶名,這樣你就能使用它們建議的名字完成首次登錄。

與這里發生的事情相比, 如何 發生更重要。這是你在 8 行代碼里完成的所有鑒權的過程,而且你能很好地管理這些代碼在哪里運行,僅僅通過 observeOn 和 subscribeOn 。

//Verify with backend, then prepare data for UI
override fun getVerifiedData(code: String):
Observable<Unit>

    return UserService.noAuthClient
        .verifyUser(authToken, code)
        .flatMap {
            UserService.authClient.fetchUserDetails()
        }.map{ data ->
            loadableUserState.loadFromData(data)
        }.observeOn(AndroidSchedulers.mainThread())
}

你也可以完成這樣的事情。如果你不鑒權,啟動一個 IO 事件來認證,而且一旦你認證了,你可以調用一個函數,現在你已經認證了,然后獲得了認證通過的回復。沒有可觀察者,實現這些不太容易,因為你有些東西需要經過驗證,然后你需要存儲它們,而且你需要另外啟動一個任務來驗證這些結果。

現在你開始沒有驗證已經不重要了,因為你會開始使用 FlatMap 用戶驗證客戶端服務,你被認證了,因為這是流工作的方式。你從流的頂部到底部,所以在你的函數被調用的時候,你就已經被認證了。或者拋出了一個錯誤,你會知道拋出的是那個錯誤的。這很強大,而且看起來也沒有很多代碼,但是你可以從一個沒有登錄的用戶開始,你能在你的 IO 之后開始你的登陸后的相關流。

結論

讓我們總結一下。我不是來告訴你們, RX 是唯一的解決問題的方式,而且總是正確的方法。在這個演講中,你們能發現有許多不平常的問題,而且容易引起混淆。這不是我想說的。我想說的是 RX 是目前最強大的工具。當然它可能不適用于所有的情況,但是如果你是在做高性能的東西,如果你做的東西和網絡相關,或者你從磁盤經常讀數據,或者你正在做許多異步工作,它的確是個正確的工具。

它的開銷不小,但是帶來的好處也不少,它能做許多事情。這里有一個學習曲線。我知道。這很難受。這好像是在玻璃上行走,正如我們昨晚討論的一樣。這不舒服。但是一旦你掌握了它,它會幫你減少錯誤。

一旦你學會這樣思考,代碼就會變得更加易讀因為它們被封裝在了一個邏輯的時間鏈條中。你可以看到有些東西訪問網絡,被轉換,然后放到 UI 線程來被你的 UI 使用。這是個鏈條。這個鏈條代表了整個動作而且你不需要引入額外的機會產生錯誤,比如引入狀態存儲。

最后,我覺得流會帶來巨大的范式轉換。我們不這樣思考,是因為這不是眼下大多數應用構建的方式,但是如果你能把數據想象成為流,你整個應用的數據流就會浮出水面。緊接著,當你使用其他的異步原語的時候,發生的事情就是你給一個線程發送一些東西,然后這個線程就在那里了,然后你的 activity 做了一些事情,你就可以在不同的地方獲得異步數據了。

如果你使用的線程強制聯系在一起,然后你能輕松地寫一個 GUI,會發生什么。這些東西原來就有,當你啟動一個網絡事件,你可以看到它貫穿你的整個應用,你所有的問題就是你的 UI 被限制住了,或者放到別處等晚點使用。第一個網絡調用,你帶入其他數據的轉換,或者過濾數據,或者減少數據,所有的事情能夠在 GUI 里面出現。這很強大,因為它給了你的應用一個方式,這個方式每個人都能理解。

如果你有一個路線圖,你知道你在什么地方,那就真的很有用了,不僅僅是開發者,對產品的人也有用。告訴它們數據流是怎么工作的,哪里獲得數據,它們如何改變,在哪里結束。

最后,這是我可以給 RX 的最好的總結。如果你有一輛汽車,你有一個老舊的汽車,你將要做的事情就是去商店,這也不錯。馬路的限速是多少,25 MPH?你可以這么說。也許座位不是很舒服,但是你可以調整它。也許不能發動了,你可以扭扭鑰匙,這都沒問題。它能工作,就很好,就很棒。

然而,如果你對旅行還有好奇,或者你想從高速公路出去,我保證如果你有一個 Tesla 和一輛老汽車,你會意識到兩者的不同,因為馬力不一樣,安全性也不一樣。它學習了過去 20 年的駕駛經驗,所以 Tesla, 有一個更好的安全記錄,并且會世界聞名。這是因為它們看到了過去發生的事情,它們改正了這些錯誤,然后在新的技術上重新構建。

如果你想開著 Tesla 去商店,你可能不會注意到一個不同,因為 25 MPH 不是這個汽車建造的目標。你不需要加速從 0 到 60 去雜貨鋪。這可能不是你最好的選擇。但是如果你開車穿過全國,如果你想做更大的事情,更復雜的東西,更持久的東西,更能紀念的東西,Tesla 是你想要的。它適合這種場合。它會讓你的生活更愉悅,更安全。而且坦率地說,有更多的樂趣。

 

 

來自:https://realm.io/cn/news/360andev-christina-lee-intro-rxjava-java-android/

 

 本文由用戶 QFRPorfirio 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!