手把手教你使用RxJava 2.0

MaybelleSwa 7年前發布 | 28K 次閱讀 RxJava 觀察者模式

網上有很多關于RxJava的技術文章,大多數是關于1.x版本的。隨著 RxJava 2.0 的推出,有些文章也介紹了2.x版本新增的內容以及與1.x版本的對比。有些同學如果剛剛接觸RxJava,僅僅看RxJava 1.x 的一些技術文章,有時候會有些出入。因此本篇文章基于RxJava 2.0 進行由淺入深的學習,逐步掌握RxJava。

1.作用

RxJava的目的就是 異步

RxJava的特點就是可以非常簡便的實現異步調用,可以在邏輯復雜的代碼邏輯中以比較輕易的方式實現異步調用。隨著邏輯的復雜,需求的更改,代碼可依然能保持極強的閱讀性,在深入的使用過程中一定對這點深有體會。

2.工程引用

要應用RxJava,需要在項目中引入依賴:

io.reactivex.rxjava2:rxjava:2.0.4
io.reactivex.rxjava2:rxjava:2.0.4

3.概念

要想理解好RxJava,首先要理解清楚其中的幾個關鍵概念。由于RxJava是利用觀察者模式來實現一些列的操作,所以對于觀察者模式中的觀察者,被觀察者,以及訂閱、事件需要有一個了解。如果不理解觀察者模式,不要緊,下面會詳細介紹。

Observable:在觀察者模式中稱為“被觀察者”;

Observer:觀察者模式中的“觀察者”,可接收Observable發送的數據;

subscribe:訂閱,觀察者與被觀察者,通過subscribe()方法進行訂閱;

Subscriber:也是一種觀察者,在2.0中 它與Observer沒什么實質的區別,不同的是 Subscriber要與Flowable(也是一種被觀察者)聯合使用,該部分內容是2.0新增的,后續文章再介紹。Obsesrver用于訂閱Observable,而Subscriber用于訂閱Flowable

4.RxJava中的觀察者模式

觀察者模式的概念很好理解,具體可以解釋為:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。

在程序的觀察者模式,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態),而是采用注冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。

下面具體講RxJava 的觀察者模式

RxJava 有四個基本概念:Observable (被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現訂閱關系,從而 Observable 可以在完成某些操作,獲得一些結果后,回調觸發事件,即發出事件來通知 Observer。

關于回調,如果理解則可以跳過這一段,如果不理解,在RxJava中可以簡單的理解為:為了方便Observable和Observer交互,在Observable中,將Observer對象傳入,在完成某些操作后調用Observer對象的方法,此時將觸發Observer中具體實現的對應方法。

注意:Observer是個接口,Observable是個類。

與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() 之外,還定義了三個特殊的事件:onComplete() 和 onError(),onSubscribe()。

onComplete(): 事件隊列完結時調用該方法。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。

onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。

onSubscribe():RxJava 2.0 中新增的,傳遞參數為Disposable ,Disposable 相當于RxJava1.x中的Subscription,用于解除訂閱。

注意:onComplete() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。

講了這么多,大家會疑惑:這些都跟異步有什么關系?

其實這都是在為異步進行鋪墊。當大家理解了觀察者模式之后,就會很容易理解RxJava的異步實現方式。讓Observable (被觀察者)開啟子線程執行耗操作,完成耗時操作后,觸發回調,通知Observer (觀察者)進行主線程UI更新。如此輕松便可以實現Android中的異步,且代碼簡潔明了,集中分布。RxJava中默認Observer (觀察者)和Observer (觀察者)都在同一線程執行任務。本文主要介紹RxJava中的一些基本使用,關于線程調度問題下篇文章再進行介紹。即本文中的所有操作都默認在同一線程進行。

好了,下面我們就開始了解RxJava的一些基本使用。

5.基本的用法

RxJava用法多種多樣,其多樣性體現在Obserable(被觀察者)的創建上。

我們先以最基礎的Obserable(被觀察者)的創建為例介紹RxJava的使用:

Observable的創建:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //執行一些其他操作
                //.............
                //執行完畢,觸發回調,通知觀察者
                e.onNext("我來發射數據");
            }
        });

Observer的創建:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            //觀察者接收到通知,進行相關操作
            public void onNext(String aLong) {
                System.out.println("我接收到數據了");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

訂閱:

observable.subscribe(observer);

使用create( )創建Observable最基本的創建方式。可以看到,這里傳入了一個 ObservableOnSubscribe對象作為參數,它的作用相當于一個計劃表,當 Observable被訂閱的時候,ObservableOnSubscribe的subscribe()方法會自動被調用,事件序列就會依照設定依次觸發(對于上面的代碼,就是觀察者Observer 將會被調用一次 onNext())。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

Observable的其他創建方式:

just()方式

Observable<String> observable = Observable.just("Hello");

使用just( ),將為你創建一個Observable并自動為你調用onNext( )發射數據。通過just( )方式 直接觸發onNext(),just中傳遞的參數將直接在Observer的onNext()方法中接收到。

fromIterable()方式

List<String> list = new ArrayList<String>();
        for(int i =0;i<10;i++){
            list.add("Hello"+i);
        }
        Observable<String> observable = Observable.fromIterable((Iterable<String>) list);

使用fromIterable(),遍歷集合,發送每個item。相當于多次回調onNext()方法,每次傳入一個item。

注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的實現類都可以作為Iterable對象直接傳入fromIterable()方法。

defer()方式

Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just("hello");
            }
        });

當觀察者訂閱時,才創建Observable,并且針對每個觀察者創建都是一個新的Observable。以何種方式創建這個Observable對象,當滿足回調條件后,就會進行相應的回調。

interval( )方式

Observable<String> observable = Observable.interval(2, TimeUnit.SECONDS);

創建一個按固定時間間隔發射整數序列的Observable,可用作定時器。即按照固定2秒一次調用onNext()方法。

range( )方式

Observable<Integer> observable = Observable.range(1,20);

創建一個發射特定整數序列的Observable,第一個參數為起始值,第二個為發送的個數,如果為0則不發送,負數則拋異常。上述表示發射1到20的數。即調用20次nNext()方法,依次傳入1-20數字。

timer( )方式

Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);

創建一個Observable,它在一個給定的延遲后發射一個特殊的值,即表示延遲2秒后,調用onNext()方法。

repeat( )方式

Observable<Integer> observable = Observable.just(123).repeat();

創建一個Observable,該Observable的事件可以重復調用。

除了Observable(被觀察者)的創建之外,RxJava 2.x 還提供了多個函數式接口 ,用于實現簡便式的觀察者模式。具體的函數式接口包括以下:

以Consumer為例,我們可以實現簡便式的觀察者模式:

Observable.just("hello").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

其中Consumer中的accept()方法接收一個來自Observable的單個值。Consumer就是一個觀察者。其他函數式接口可以類似應用。

注意:Observable (被觀察者)只有在被Observer (觀察者)訂閱后才能執行其內部的相關邏輯,下面代碼證實了這一點:

Observable<Long> observable = Observable.interval(2, TimeUnit.SECONDS);
        Observer<Long> observer = new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                    System.out.println(aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        SystemClock.sleep(10000);//睡眠10秒后,才進行訂閱  仍然從0開始,表示Observable內部邏輯剛開始執行
        observable.subscribe(observer);

01-18 16:09:20.874 12535-12927/com.lvr.rxjavalearning I/System.out: 0

01-18 16:09:22.864 12535-12927/com.lvr.rxjavalearning I/System.out: 1

01-18 16:09:24.864 12535-12927/com.lvr.rxjavalearning I/System.out: 2

01-18 16:09:26.864 12535-12927/com.lvr.rxjavalearning I/System.out: 3

除此之外,RxJava中還有許多操作符。 操作符就是用于在Observable和最終的Observer之間,通過轉換Observable為其他觀察者對象的過程,修改發出的事件,最終將最簡潔的數據傳遞給Observer對象。 下面我們介紹一些比較常用的操作符。

6.RxJava中的操作符

map()操作符

Observable<Integer> observable = Observable.just("hello").map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return s.length();
            }
        });

map()操作符,就是把原來的Observable對象轉換成另一個Observable對象,同時將傳輸的數據進行一些靈活的操作,方便Observer獲得想要的數據形式。

flatMap()操作符

Observable<Object> observable = Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        });

flatMap()對于數據的轉換比map()更加徹底,如果發送的數據是集合,flatmap()重新生成一個Observable對象,并把數據轉換成Observer想要的數據形式。它可以返回任何它想返回的Observable對象。

filter()操作符

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).filter(new Predicate<Object>() {
            @Override
            public boolean test(Object s) throws Exception {
                String newStr = (String) s;
                if (newStr.charAt(5) - '0' > 5) {
                    return true;
                }
                return false;
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println((String)o);
            }
        });

filter()操作符根據test()方法中,根據自己想過濾的數據加入相應的邏輯判斷,返回true則表示數據滿足條件,返回false則表示數據需要被過濾。最后過濾出的數據將加入到新的Observable對象中,方便傳遞給Observer想要的數據形式。

take()操作符

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                System.out.println((String)s);
            }
        });

take()操作符:輸出最多指定數量的結果。

doOnNext()

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).doOnNext(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println("準備工作");
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                System.out.println((String)s);
            }
        });

doOnNext()允許我們在每次輸出一個元素之前做一些額外的事情。

以上就是一些常用的操作符,通過操作符的使用。我們每次調用一次操作符,就進行一次觀察者對象的改變,同時將需要傳遞的數據進行轉變,最終Observer對象獲得想要的數據。

以網絡加載為例,我們通過Observable開啟子線程,進行一些網絡請求獲取數據的操作,獲得到網絡數據后,然后通過操作符進行轉換,獲得我們想要的形式的數據,然后傳遞給Observer對象。

以上僅僅是介紹RxJava的觀察者模式以及RxJava的簡單操作與使用。通過本篇文章,可以對RxJava有個簡單的了解。后面我會繼續介紹RxJava中線程調度的內容,以及RxJava 2.x 中新增的功能。如果大家喜歡這部分內容,可以持續關注,后面會繼續更新。

 

來自:http://www.jianshu.com/p/d149043d103a

 

 本文由用戶 MaybelleSwa 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!