RxJava 教程第二部分:創建事件流

fjdsaj 8年前發布 | 13K 次閱讀 RxJava 工廠模式 Java開發

 

現在你應該了解 Rx 的概念了,是時候開始創建和操作事件流了。操作事件流的原始實現是基于 C# 的 LINQ ,而 LINQ 是受到 functional programming 啟發的。如果你了解 LINQ 更容易理解本節內容, 如果不了解也沒關系。我們將從最簡單的內容開始介紹。 大部分的 Rx 操作函數(operators )用來操作已經存在的事件流。在介紹操作函數之前,先來看看如何創建一個 Observable。

創建一個事件流

在第一部分示例中,我們使用 Subject 來手工的把數據推送給他并創建一個事件流。我們使用這種方式來示例一些核心的概念和Rx 中一些核心的函數 subscribe。大部分時候使用 Subject 都不是創建 Observable 的最佳方式。 下面將介紹如何創建 Observable 來發射事件流。

簡單的工廠方法

Observable 有很多工廠方法可以創建一個事件流。

Observable.just

just 函數創建一個發射預定義好的數據的 Observable ,發射完這些數據后,事件流就結束了。

Observable<String> values = Observable.just("one", "two", "three");
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);  
 

結果:

Received: one
Received: two
Received: three
Completed
 

Observable.empty

這個函數創建的 Observable 只發射一個 onCompleted 事件就結束了。

Observable<String> values = Observable.empty();
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
 

結果:

Completed
 

Observable.never

這個 Observable 將不會發射任何事件和數據。

Observable<String> values = Observable.never();
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
 

上面的代碼不會打印任何東西。但是這個代碼并沒有阻塞住,實際上上面的代碼立刻就執行完了。

Observable.error

這個 Observable 將會發射一個 error 事件,然后結束。

Observable<String> values = Observable.error(new Exception("Oops"));
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
 

結果:

Error: java.lang.Exception: Oops
 

Observable.defer

defer 并沒有定義一個新的 Observable, defer 只是用來聲明當 Subscriber 訂閱到一個 Observable 上時,該 Observable 應該如何創建。例如,如果我們想創建一個發射當前時間然后就結束的 Observable, 發射一個數據然后結束,看起來用 just 實現即可:

Observable<Long> now = Observable.just(System.currentTimeMillis());
 
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);
 

結果:

1431443908375
1431443908375
 

注意上面兩個 subscriber 相隔 1秒訂閱這個 Observable,但是他們收到的時間數據是一樣的!這是因為當訂閱的時候,時間數據只調用一次。其實你希望的是,當 一個 subscriber 訂閱的時候才去獲取當前的時間。 defer 的參數是一個返回一個 Observable 對象的函數。該函數返回的 Observable 對象就是 defer 返回的 Observable 對象。 重點是,每當一個新的 Subscriber 訂閱的時候,這個函數就重新執行一次。

Observable<Long> now = Observable.defer(() ->
        Observable.just(System.currentTimeMillis()));
 
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);
 

結果:

1431444107854
1431444108858
 

Observable.create

create 是非常強大的一個函數。可以創建任何你需要的 Observable。

static <T> Observable<T> create(Observable.OnSubscribe<T> f)
 

上面是 create 函數的定義,參數 Observable.OnSubscribe 看起來很簡單。OnSubscribe 只有一個函數其參數為 Subscriber 。在該函數內我們可以手工的發射事件和數據到 subscriber。

Observable<String> values = Observable.create(o -> {
    o.onNext("Hello");
    o.onCompleted();
});
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
 

結果:

Received: Hello
Completed
 

當有 Subscriber 訂閱到這個 Observable 時(上面示例中的 values ),這個 Subscriber 對象就是你實現的函數中的參數 Subscriber。然后你可以在你的代碼中把數據發射到這個 subscriber 中。注意,當數據發射完后,你需要手工的調用 onCompleted 來表明發射完成了。

如果之前的所有方法都不滿足你的要求時,這個函數應當作為你創建自定義 Observable 的最佳方式。其實現方式和 第一部分我們通過 Subject 來發射事件類似,但是有幾點非常重要的區別。首先:數據源被封裝起來了,并和不相關的代碼隔離開了。其次:Subject 有一些不太明顯的問題,通過使用 Subject 你自己在管理狀態,并且任何訪問該 Subject 對象的人都可以往里面發送數據然后改變事件流。

還一個主要的區別是執行代碼的時機,使用 create 創建的 Observable,當 Observable 創建的時候,你的函數還沒有執行,只有當有 Subscriber 訂閱的時候才執行。這就意味著每次當有 Subscriber 訂閱的時候,該函數就執行一次。和 defer 的功能類似。結果和 ReplaySubject 類似, ReplaySubject 會緩存結果 當有新的 Subscriber 訂閱的時候,把緩存的結果在發射給新的 Subscriber。如果要使用 ReplaySubject 來實現和 create 類似的功能,如果 create 中創建數據的函數是阻塞的話,則 ReplaySubject 在創建的時候線程會阻塞住知道 創建函數執行完。如果不想阻塞當前線程的話,則需要手工創建一個線程來初始化數據。其實 Rx 有更加優雅的方式解決這個問題。

其實使用 Observable.create 可以實現 前面幾個工廠方法的功能。比如 上面的 create 函數的功能和 Observable.just(“hello”) 的功能是一樣的。

Functional unfolds

在 functional programming(函數式編程)中,創建一系列數字是非常常見的。 RxJava 也提供了一些工廠方法來創建這樣的序列。

Observable.range

做過函數式編碼的程序員都了解這個函數的意思。 該函數發射一個整數序列:

Observable<Integer> values = Observable.range(10, 15);
 

上面示例將生成一個從 10 到 24 的數字序列(從 10 開始,發射 15個數字)。

Observable.interval

創建一個無限的計時序列,每隔一段時間發射一個數字,從 0 開始:

Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
System.in.read();
 

結果:

Received: 0
Received: 1
Received: 2
Received: 3
...
 

如果我們不調用 unsubscribe 的話,這個序列是不會停止的。

上面的代碼在最后有個 System.in.read(); 阻塞語句,這個語句是有必要的,不然的話,程序不會打印任何內容就退出了。原因是我們的操作不是阻塞的:我們創建了一個每隔一段時間就發射數據的 Observable,然后我們注冊了一個 Subscriber 來打印收到的數據。這兩個操作都是非阻塞的,而 發射數據的計時器是運行在另外一個線程的,但是這個線程不會阻止 JVM 結束當前的程序,所以 如果沒有 System.in.read(); 這個阻塞操作,還沒發射數據則程序就已經結束運行了。

Observable.timer

Observable.timer 有兩個重載函數。第一個示例創建了一個 Observable, 該 Observable 等待一段時間,然后發射數據 0 ,然后就結束了。

Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS);
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
 

結果:

Received: 0
Completed
 

另外一個示例是,先等待一段時間,然后開始按照間隔的時間一直發射數據:

Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS);
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
 

結果:

Received: 0
Received: 1
Received: 2
...
 

上面的示例,先等待 2秒,然后每隔一秒開始發射數據。

轉換為 Observable

已經有很多工具來處理序列數據、集合和異步事件了,但是這些工具可能無法直接在 Rx 中使用。下面來介紹幾個方法可以把這些工具產生的結果轉換到你的 Rx 代碼中。

如果你在使用類似 JavaFX 中的異步事件處理,則可以使用 Observable.create 把異步事件轉換到 Observable 中:

Observable<ActionEvent> events = Observable.create(o -> {
    button2.setOnAction(new EventHandler<ActionEvent>() {
        @Override public void handle(ActionEvent e) {
            o.onNext(e)
        }
    });
})
 

根據事件的不同,事件的類型(上面的示例中事件類型為 ActionEvent)可能適合作為 Observable 發射數據的類型。 如果你需要的是該事件類型的屬性(比如事件發射的位置),則獲取該屬性并把獲取到的結果發射給最終的 Subscriber ,Subscriber 接受到結果可能和事件發生時的數據不一樣。為了避免這種問題,在 Observable 中傳遞的數據應該保持其狀態不變。

Observable.from

在 Java 并發框架中經常使用 Future 來獲取異步結果。 通過使用 from 可以把 Future 的結果發射到 Observable 中:

FutureTask<Integer> f = new FutureTask<Integer>(() -> {
    Thread.sleep(2000);
    return 21;
});
new Thread(f).start();
 
Observable<Integer> values = Observable.from(f);
 
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
 

結果:

Received: 21
Completed
 

當 FutureTask 執行完后, Observable 發射 Future 獲取到的結果然后結束。如果任務 取消了,則 Observable 會發射一個 java.util.concurrent.CancellationException 錯誤信息。

你還可以對 Future 設置超時時間:

Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
 

當過了超時時間后, Future 還是沒有返回結果, Observable 可以忽略其結果并發射一個 TimeoutException。

Observable.from 還有重載的函數可以用一個數據集合或者一個可以遍歷的iterable 的數據來生成一個 Observable。逐個發射集合中的數據,最后發射一個 onCompleted 事件。

Integer[] is = {1,2,3};
Observable<Integer> values = Observable.from(is);
Subscriptionsubscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
 

結果:

Received: 1
Received: 2
Received: 3
Completed
 

Observable 和 Iterable 或者 Stream 是不能通用的。Observables 是基于 push 模型的,而 Iterable 是基于 pull 模型的。

來自: http://blog.chengyunfeng.com/?p=959

 本文由用戶 fjdsaj 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!