RxJava初探
來自:http://codethink.me/2015/05/09/intro-of-rxjava/
0.前言
本文主要記錄了初步學習RxJava后的總結,希望用最短的篇幅講清楚RxJava的主要用法。部分內容來自Dan Lew的Grokking RxJava。
本文的示例代碼在這里。
1 基本概念
1.1 Rx結構
響應式編程的主要組成部分是observable, operator和susbscriber(與Dan Lew的文章不同,這里把Operator也做為組成部分介紹,這樣對結構的整體性會有更全面的認識)。
一般響應式編程的信息流如下所示:
Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber
也就是說,observable是事件的生產者,subscriber是事件最終的消費者。
因為subscriber通常在主線程中執行,因此設計上要求其代碼盡可能簡單,只對事件進行響應,而修改事件的工作全部由operator執行。
1.2 最簡單的模式
如果我們不需要修改事件,就不需要在observable和subscriber中插入operator。這時的Rx結構如下:
Obsevable -> Subscriber
這看起來很像設計模式中的觀察者模式,他們最重要的區別之一在于在沒有subscriber之前,observable不會產生事件。
一個簡單的RxJava HelloWorld的代碼如下。
// 創建observable Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava"); subscriber.onCompleted(); } }); // 創建subscriber Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { log(s); } }; // 訂閱 observable.subscribe(subscriber);
這里的代碼對于一句簡單的HelloWorld的而言太繁瑣了。因此,RxJava提供了一些簡化的方法。
首先是創建observable,如果我們只需要發送一個事件(這里的事件是字符串”Hellow RxJava”),我們可以使用Observable類的just方法,簡化后的代碼如下
// 創建observable Observable<String> observable = Observable.just("Hello RxJava");
同樣,如果我們不關心subscriber是否結束(onComplete())或者發生錯誤(onError()),subscriber的代碼可以簡化為
// 創建subscriber Action1<String> subscriber = new Action1<String>() { @Override public void call(String s) { log(s); } };
我們直接把創建和訂閱連接起來,完整的代碼如下。
Observable.just("Hello RxJava").subscribe(new Action1<String>() { @Override public void call(String s) { log(s); } });
最后,使用Java 8的lambda(Android上可以使用Retrolambda),這個HelloWorld的最終版本如下:
Observable.just("Hello RxJava") .subscribe(s -> log(s));
1.3 加入operator
很多時候,我們需要針對處理過的事件做出響應,而不僅僅是Observable產生的原始事件。由于1.1中闡述的原因,這里就需要引入operator來處理原始事件。
這里以一個極簡單的Markdown處理為例:假設輸入的是Markdown格式的文件,最終展示文字的是一個WebView,這里就需要引入一個將Markdown轉為HTML的operator,其代碼如下:
Observable.just("#Basic Markdown to HTML").map(new Func1<String, String>() { @Override public String call(String s) { if(s != null && s.startsWith("#")) { return "<h1>" + s.substring(1, s.length()) + "</h1>"; } return null; } }).subscribe(s -> log(s));
這里使用了名為map()的operator,它的作用很簡單,就是接收一個事件,并返回處理后的事件。Func1的第一個泛型參數表示輸入類型,第二個繁星參數表示返回類型。
我們這里同樣可以采用lambda來簡化代碼,簡化后的代碼如下:
Observable.just("#Basic Markdown to HTML with lambda") .map(s -> s != null && s.startsWith("#") ? "<h1>" + s.substring(1, s.length()) + "</h1>" : null) .subscribe(s -> log(s));
1.4 Subscription
前三小節有意隱藏了RxJava的一個細節,實際上執行Observable.subscribe()時,它會返回一個Subscrition,它代表了Observable和Subscriber之間的關系。你可以通過Subscrition解除Observable和Subscriber之間的訂閱關系,并立即停止執行整個訂閱鏈。示例代碼如下:
Subscription subscription = Observable.just("Unsubscribe me later").subscribe(s -> log(s)); subscription.unsubscribe(); log("isSubscribed = " + subscription.isUnsubscribed());
3 多線程
在開發過程中,為了避免阻塞UI線程,我們可能需要將某些工作放到指定線程執行。在RxJava中,你可以通過subscribeOn()來指定Observer的運行線程,通過observeOn()指定Subscriber的運行線程。這兩個方法都是operator,因此它們可以像所有operator那樣作用于任何的Observable。一個簡單的例子如下:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { log("Observable on Thread -> " + Thread.currentThread().getName()); subscriber.onNext("MultiThreading"); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(s -> { log("Subscriber on Thread -> " + Thread.currentThread().getName()); });
4 錯誤處理
RxJava使用Subscriber的onError()進行錯誤處理。每一個Obervable的執行最后一定會調用onCompleted()和onError()方法中的一個。相比于傳統的回調處理錯誤的方式,訂閱鏈中任何時候出現的錯誤,都只需要在Subscriber的onError()方法中處理,operator不需要處理異常。
5 小結
相比于Otto這種總線式的處理方式,RxJava對于訂閱事件的處理更精細。同時,它還引入了許多函數式編程的特性,對于信息流處理有更好的解耦。目前只是通過閱讀以及一些玩具代碼初步了解了其用法,這僅僅是個開始。希望在實際項目中使用后,能有時間總結諸如自定義Operator等更多的高級用法。