RxJava初探

pjp 9年前發布 | 25K 次閱讀 RxJava Android開發 移動開發

來自:http://codethink.me/2015/05/09/intro-of-rxjava/

0.前言

本文主要記錄了初步學習RxJava后的總結,希望用最短的篇幅講清楚RxJava的主要用法。部分內容來自Dan Lew的Grokking RxJava

本文的示例代碼在這里

1 基本概念

1.1 Rx結構

響應式編程的主要組成部分是observable, operator和susbscriber(與Dan Lew的文章不同,這里把Operator也做為組成部分介紹,這樣對結構的整體性會有更全面的認識)。
一般響應式編程的信息流如下所示:

Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber

也就是說,observable是事件的生產者,subscriber是事件最終的消費者。

因為subscriber通常在主線程中執行,因此設計上要求其代碼盡可能簡單,只對事件進行響應,而修改事件的工作全部由operator執行。

1.2 最簡單的模式

如果我們不需要修改事件,就不需要在observable和subscriber中插入operator。這時的Rx結構如下:

Obsevable -> Subscriber

這看起來很像設計模式中的觀察者模式,他們最重要的區別之一在于在沒有subscriber之前,observable不會產生事件。

一個簡單的RxJava HelloWorld的代碼如下。

// 創建observable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava");
        subscriber.onCompleted();
    }
});

// 創建subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }
    @Override
    public void onNext(String s) {
        log(s);
    }
};

// 訂閱
observable.subscribe(subscriber);


這里的代碼對于一句簡單的HelloWorld的而言太繁瑣了。因此,RxJava提供了一些簡化的方法。

首先是創建observable,如果我們只需要發送一個事件(這里的事件是字符串”Hellow RxJava”),我們可以使用Observable類的just方法,簡化后的代碼如下

// 創建observable
Observable<String> observable = Observable.just("Hello RxJava");


同樣,如果我們不關心subscriber是否結束(onComplete())或者發生錯誤(onError()),subscriber的代碼可以簡化為

// 創建subscriber
Action1<String> subscriber = new Action1<String>() {
    @Override
    public void call(String s) {
        log(s);
    }
};


我們直接把創建和訂閱連接起來,完整的代碼如下。

Observable.just("Hello RxJava").subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        log(s);
    }
});


最后,使用Java 8的lambda(Android上可以使用Retrolambda),這個HelloWorld的最終版本如下:

Observable.just("Hello RxJava")
    .subscribe(s -> log(s));


1.3 加入operator

很多時候,我們需要針對處理過的事件做出響應,而不僅僅是Observable產生的原始事件。由于1.1中闡述的原因,這里就需要引入operator來處理原始事件。

這里以一個極簡單的Markdown處理為例:假設輸入的是Markdown格式的文件,最終展示文字的是一個WebView,這里就需要引入一個將Markdown轉為HTML的operator,其代碼如下:

Observable.just("#Basic Markdown to HTML").map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        if(s != null && s.startsWith("#")) {
            return "<h1>" + s.substring(1, s.length()) + "</h1>";
        }
        return null;
    }
}).subscribe(s -> log(s));


這里使用了名為map()的operator,它的作用很簡單,就是接收一個事件,并返回處理后的事件。Func1的第一個泛型參數表示輸入類型,第二個繁星參數表示返回類型。

我們這里同樣可以采用lambda來簡化代碼,簡化后的代碼如下:

Observable.just("#Basic Markdown to HTML with lambda")
        .map(s -> s != null && s.startsWith("#") ? "<h1>" + s.substring(1, s.length()) + "</h1>" : null)
        .subscribe(s -> log(s));


1.4 Subscription

前三小節有意隱藏了RxJava的一個細節,實際上執行Observable.subscribe()時,它會返回一個Subscrition,它代表了Observable和Subscriber之間的關系。你可以通過Subscrition解除Observable和Subscriber之間的訂閱關系,并立即停止執行整個訂閱鏈。示例代碼如下:

Subscription subscription = Observable.just("Unsubscribe me later").subscribe(s -> log(s));
subscription.unsubscribe();
log("isSubscribed = " + subscription.isUnsubscribed());


3 多線程

在開發過程中,為了避免阻塞UI線程,我們可能需要將某些工作放到指定線程執行。在RxJava中,你可以通過subscribeOn()來指定Observer的運行線程,通過observeOn()指定Subscriber的運行線程。這兩個方法都是operator,因此它們可以像所有operator那樣作用于任何的Observable。一個簡單的例子如下:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        log("Observable on Thread -> " + Thread.currentThread().getName());
        subscriber.onNext("MultiThreading");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            log("Subscriber on Thread -> " + Thread.currentThread().getName());
        });


4 錯誤處理

RxJava使用Subscriber的onError()進行錯誤處理。每一個Obervable的執行最后一定會調用onCompleted()和onError()方法中的一個。相比于傳統的回調處理錯誤的方式,訂閱鏈中任何時候出現的錯誤,都只需要在Subscriber的onError()方法中處理,operator不需要處理異常。

5 小結

相比于Otto這種總線式的處理方式,RxJava對于訂閱事件的處理更精細。同時,它還引入了許多函數式編程的特性,對于信息流處理有更好的解耦。目前只是通過閱讀以及一些玩具代碼初步了解了其用法,這僅僅是個開始。希望在實際項目中使用后,能有時間總結諸如自定義Operator等更多的高級用法。

參考

Grokking RxJava

 本文由用戶 pjp 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!