RxJava 教程第一部分:入門之 關鍵的類

ylta2603 8年前發布 | 14K 次閱讀 RxJava Java開發

 

Rx 有兩個最基礎的類型,和其他一些擴展這兩種類型的類。兩個核心的類為: ObservableObserverSubject 是同時繼承了 Observable 和 Observer。

Rx 是在 Observer 模式 之上建立起來的。這種模式很常見,在 Java 中有很多地方都使用了該模式,比如 JavaFx 中的 EventHandler。 這些簡單的使用方式和 Rx 對比有如下區別:

  • 使用 event handler 來處理事件很難組合使用
  • 無法延時處理查詢事件
  • 可能會導致內存泄露
  • 沒有標準的標示完成的方式
  • 需要手工的來處理并行和多線程

Observable

Observable 是第一個核心類。該類包含了 Rx 中的很多實現,以及所有核心的操作函數(operator、或者說 操作符)。在本系列教程中會逐步介紹每個操作函數。現在我們只需要理解 subscribe 函數即可,下面是該函數的一種定義:

public final Subscriptionsubscribe(Subscriber<? super T> subscriber)
 

該函數是用來接收 observable 發射的事件的。當事件被發射后,他們就丟給了 subscriber, subscriber 是用來處理事件的實現。這里的 Subscriber 參數實現了 Observer 接口。

一個 Observable 發射 三種類型的事件:

– Values (數據)

– 完成狀態,告訴 Subscriber 事件(數據) 發射完畢,沒有其他數據了

– Error, 錯誤狀態,如果在發射數據的過程中出現錯誤了。會發送該事件。

Observer

Subscriber 是 Observer 的一個實現。 Subscriber 實現了其他一些額外的功能,可以作為我們實現 Observer 的基類。現在先看看 Observer 的接口定義:

interface Observer<T> {
    void onCompleted();
    void onError(java.lang.Throwable e);
    void onNext(T t);
}
 

每次 Observable 發射事件的時候就會執行這三個對應的函數。Observer 的 onNext 函數會被調用0次或者多次,然后會調用 onCompleted 或者 onError。在 onCompleted 或者 onError 發生以后就不會再有其他事件發射出來了。

在使用 Rx 開發的過程中,你會看到很多 Observable,但是 Observer 出場的時候很少。但是理解 Observer 的概念是非常重要的,雖然有很多簡寫方式來幫助更加簡潔的使用 Observer。

實現 Observable 和 Observer

你可以手工的實現 Observer 或者擴展 Observable。 在真實場景中并不需要這樣做,Rx 已經提供了很多可以直接使用的工廠方法了。使用 Rx 提供的工具來創建 Observable 和 Observer 比手工實現要更加安全和簡潔。

要訂閱到一個 Observable,并不需要提供一個 Observer 示例。subscribe 函數有各種重載方法可以使用,你可以只訂閱 onNext 事件,有可以只訂閱 onError 事件,這樣就不用提供 Observer 對象就可以接受事件了。每次只需要提供你關心的函數即可,例如 如果你不關心 error 和完成事件,則只提供 onNext 來接收每次發送的數據即可。

配合 Java 8 的 Lambda 表達式 則使用起來代碼看起來會更加簡潔,所以本系列示例代碼會使用 lambda 表達式,如果你不了解的話,可以先看看 掌握 Java 8 Lambda 表達式

Subject

Subject 是 Observable 的一個擴展,同時還實現了 Observer 接口。第一眼看上去可能有點奇怪,但是在有些場合下使用 Subject 將會非常便捷。他們可以像 Observer 一樣接收事件,同時還可以像 Observable 一樣把接收到的事件再發射出去。這種特性非常適合 Rx 中的接入點,當你的事件來至于 Rx 框架之外的代碼的時候,你可以把這些數據先放到 Subject 中,然后再把 Subject轉換為一個 Observable,就可以在 Rx 中使用它們了。你可以把 Subject 當做 Rx 中的 事件管道。

Subject 有兩個參數類型:輸入參數和輸出參數。這樣設計是為了抽象而不是應為使用 Subject 是為了轉換數據類型。轉換數據應該使用轉換操作函數來完成,后面我們將介紹各種操作函數。

Subject 有各種不同的具體實現。下面將介紹一些非常重要的實現以及他們之間的區別。

PublishSubject

PublishSubject 是最直接的一個 Subject。當一個數據發射到 PublishSubject 中時,PublishSubject 將立刻把這個數據發射到訂閱到該 subject 上的所有 subscriber 中。

public static void main(String[] args) {
    PublishSubject<Integer> subject = PublishSubject.create();
    subject.onNext(1);
    subject.subscribe(System.out::println);
    subject.onNext(2);
    subject.onNext(3);
    subject.onNext(4);
}
 

輸出的結果:

2
3
4
 

上面的 System.out::println 是 Lambda 表達式中的函數引用 ,如果表達式代碼塊只有一個函數調用,則可以直接使用函數引用來簡化代碼

可以看到,數據 1 并沒有打印出來,原因是當我們訂閱到 subject 的時候,1 已經發射出去了。當訂閱到 subject 后就開始接收 發射到 subject 中的數據了。

這是我們初次看到如何使用 subscribe 函數,值得詳細研究下是如何用的。 這里我們使用了一個重載的參數只有一個 Function 類型。這個參數 Function 接收一個參數 Integer 并且沒有返回值。 沒有返回值的 Function 在 Rx 中被稱之為 action。 可以使用下面幾種方式來提供這個 Function:

  • 提供一個 Action1 的實現對象
  • 使用Lambda 表達式 實現
  • 使用符合該接口定義類型的 Lambda 表達式函數引用 。這里 System.out::println 函數可以接受一個 Object 對象,符合 Action 的定義(接受一個參數并沒有返回值),所以我們可以把該函數作為函數應用使用。 subscribe 將會使用他收到的值作為 println 函數的參數來調用 println 函數。

ReplaySubject

ReplaySubject 可以緩存所有發射給他的數據。當一個新的訂閱者訂閱的時候,緩存的所有數據都會發射給這個訂閱者。 由于使用了緩存,所以每個訂閱者都會收到所以的數據:

ReplaySubject<Integer> s = ReplaySubject.create();  
s.subscribe(v -> System.out.println("Early:" + v));
s.onNext(0);
s.onNext(1);
s.subscribe(v -> System.out.println("Late: " + v)); 
s.onNext(2);
 

輸出結果:

Early:0
Early:1
Late: 0
Late: 1
Early:2
Late: 2
 

不管是何時訂閱的,每個訂閱者都收到了所有的數據。注意后一個訂閱者在處理 2 之前就先收到了之前發射的數據 0和1.

緩存所有的數據并不是一個十分理想的情況,如果 Observable 事件流運行很長時間,則緩存所有的數據會消耗很多內存。可以限制緩存數據的數量和時間。 ReplaySubject.createWithSize 限制緩存多少個數據;而 ReplaySubject.createWithTime 限制一個數據可以在緩存中保留多長時間。

ReplaySubject<Integer> s = ReplaySubject.createWithSize(2); 
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v)); 
s.onNext(3);
 

結果:

Late: 1
Late: 2
Late: 3
 

由于指定只緩存兩個數據,所以當訂閱的時候第一個數據 0 就收不到了。 限制緩存的時間也是一樣的情況:

ReplaySubject<Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS,
                                                        Schedulers.immediate());
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(100);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v)); 
s.onNext(3);
 

結果:

Late: 1
Late: 2
Late: 3
 

使用時間緩存創建 ReplaySubject 需要指定一個 Scheduler, Scheduler 是 Rx 中保持時間的方式。現在可以假裝他不存在,不用關心他。

ReplaySubject.createWithTimeAndSize 則可以同時限制時間和個數。

BehaviorSubject

BehaviorSubject 只保留最后一個值。 等同于限制 ReplaySubject 的個數為 1 的情況。在創建的時候可以指定一個初始值,這樣可以確保黨訂閱者訂閱的時候可以立刻收到一個值。

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v)); 
s.onNext(3);
 

結果:

Late: 2
Late: 3
 

下面的示例只是打印出 Completed, 由于最后一個事件就是 Completed。

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
s.subscribe(
    v -> System.out.println("Late: " + v),
    e -> System.out.println("Error"),
    () -> System.out.println("Completed")
);
 

這里使用了 subscribe 函數的另外一種重載形式,接受三個參數。

下面使用了默認初始化值,如果訂閱者的發射數據之前就訂閱了,則會收到這個初始化的值:

BehaviorSubject<Integer> s = BehaviorSubject.create(0);
s.subscribe(v -> System.out.println(v));
s.onNext(1);
 

結果:

0
1
 

由于 BehaviorSubject 的定義就是總是有可用的數據,所以一般都會使用初始化值來創建 BehaviorSubject 。

AsyncSubject

AsyncSubject 也緩存最后一個數據。區別是 AsyncSubject 只有當數據發送完成時(onCompleted 調用的時候)才發射這個緩存的最后一個數據。可以使用 AsyncSubject 發射一個數據并立刻結束。

AsyncSubject<Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
 

結果:

2

如果上面的示例不調用 s.onCompleted(); 則什么結果都不會打印出來。

隱含的規則

Rx 中有一些隱含的規則在代碼中并不太容易看到。一個重要的規則就是當一個事件流結束(onError 或者 onCompleted 都會導致事件流結束)后就不會發射任何數據了。這些 Subject 的實現都遵守這個規則,subscribe 函數也拒絕違反該規則的情況。

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);
 

結果:

0

但是在 Rx 實現中并沒有完全確保這個規則,所以你在使用 Rx 的過程中要注意遵守該規則,否則會出現意料不到的情況。

來自: http://blog.chengyunfeng.com/?p=948

 本文由用戶 ylta2603 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!