RxJava 入門

jopen 9年前發布 | 25K 次閱讀 RxJava Java開發

什么是 ReactiveX?

ReactiveX 是一個專注于異步編程與控制可觀察數據(或者事件)流的API。它組合了觀察者模式,迭代器模式和函數式編程的優秀思想。

實時數據處理是一件普通的現象,有一個高效、干凈和可擴展的方式來處理這些情景是重要的。使用 Observables 和 Operators 來熟練操作它們。ReactiveX 提供一個可組合又靈活的 API 來創建和處理數據流,同時簡化了異步編程帶來的一些擔憂,如:線程創建和并發問題。

RxJava 簡介

RxJava 是 ReactiveX 在 Java 上的開源的實現。Observable(觀察者) 和 Subscriber(訂閱者)是兩個主要的類。在 RxJava 上,一個 Observable 是一個發出數據流或者事件的類,Subscriber 是一個對這些發出的 items (數據流或者事件)進行處理(采取行動)的類。一個 Observable 的標準流發出一個或多個 item,然后成功完成或者出錯。一個 Observable 可以有多個 Subscribers,并且通過 Observable 發出的每一個 item,該 item 將會被發送到 Subscriber.onNext() 方法來進行處理。一旦 Observable 不再發出 items,它將會調用 Subscriber.onCompleted() 方法,或如果有一個出錯的話 Observable 會調用 Subscriber.onError() 方法。

現在,我們知道了很多關于 Observable 和 Subscriber 類,我們可以繼續去介紹有關 Observables 的創建和訂閱。

Observable integerObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(1);
       subscriber.onNext(2);
       subscriber.onNext(3);
       subscriber.onCompleted();
   }
});  

這個 Observable 發出了整數 1,2,3 然后結束了。現在我們需要創建一個 Subscriber,那樣我們就能讓這些發出的流起作用。

Subscriber integerSubscriber = new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }
   @Override
   public void onError(Throwable e) {
   }
   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
};  

我們的 Subscriber 只是簡單的把任何發出的 items 打印出來,完成之后通知我們。一旦你有一個 Observable 和一個 Subscriber,可以通過 Observable.subscribe() 方法將他們聯系起來。

integerObservable.subscribe(integerSubscriber);
// Outputs:
// onNext: 1
// onNext: 2
// onNext: 3
// Complete! 

上面所有這些代碼可以簡單的通過使用 Observable.just() 方法來創建一個 Observable 去發出這些定義的值,并且我們的 Subscriber 可以改變成匿名的內部類,如下:

Observable.just(1, 2 ,3).subscribe(new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }
   @Override
   public void onError(Throwable e) {}
   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
});  

操作符

創建和訂閱一個 Observable 是足夠簡單的,可能這并不是非常有用的,但這只是用 RxJava 的一個開始。通過調用操作符,任何的 Observable 都能進行輸出轉變,多個 Operators 能鏈接到 Observable 上。例如,在我們剛才的 Observable 中,我們只想要收到奇數的數字。要做到這一點,我可以使用 filter() 操作符。

Observable.just(1, 2, 3, 4, 5, 6) // add more numbers
       .filter(new Func1() {
           @Override
           public Boolean call(Integer value) {
               return value % 2 == 1;
           }
       })
       .subscribe(new Subscriber() {
           @Override
           public void onCompleted() {
               System.out.println("Complete!");
           }
           @Override
           public void onError(Throwable e) {
           }
           @Override
           public void onNext(Integer value) {
               System.out.println("onNext: " + value);
           }
       });
// Outputs:
// onNext: 1
// onNext: 3
// onNext: 5
// Complete!  

我們的 filter() 操作符定義了一個方法,將取出我們發出的整數,并對所有的奇數返回為 true,所有的偶數返回為 false。從我們的 filter() 返回為 false 的值是不會發出到 Subscriber 的,我們也不會在輸出中看到他們。注意:filter() 操作符返回的是一個 Observable,這樣我們的訂閱方式就可以像之前的做法那樣了。

現在,我想找到發出的這些奇數的平方根,一種方法是在調用我們的 Subscriber 的每一個 onNext() 去計算平方根。然而,如果我們在我們的 Subscriber 中做計算平方根的操作的話,這樣得到期望可能就不能進一步實現的數據的流式轉換了。要做到這一點,我們可以在 filter() 操作符上鏈上 map() 操作符。

Observable.just(1, 2, 3, 4, 5, 6) // add more numbers
       .filter(new Func1() {
           @Override
           public Boolean call(Integer value) {
               return value % 2 == 1;
           }
       })
       .map(new Func1() {
           @Override
           public Double call(Integer value) {
               return Math.sqrt(value);
           }
       })
       .subscribe(new Subscriber() { // notice Subscriber type changed to 
           @Override
           public void onCompleted() {
               System.out.println("Complete!");
           }
           @Override
           public void onError(Throwable e) {
           }
           @Override
           public void onNext(Double value) {
               System.out.println("onNext: " + value);
           }
       });
// Outputs:
// onNext: 1.0
// onNext: 1.7320508075688772
// onNext: 2.23606797749979
// Complete!  

操作符的鏈式使用是構成 RxJava 必不可少的一部分,讓你可以靈活的實現任何你想要的需求。隨著對于 Observables 和 Operators 相互作用的理解,我們可以進入下一個話題:RxJava 和 Android 的整合。

讓 Android 中的線程操作變得簡單

在 Android 開發中有一個常見的場景是需要在后臺線程去分擔一定量的工作,一旦該任務完成,會將結果回調到主線程去顯示結果。

在 Android 中,我們有多種方法來做這樣的事:用 AsyncTasks,Loaders,Services 等。然而,這些解決方式通常不是最好的。Asynctasks 很容易導致內存泄露,CursorLoaders 與 ContentProvider 需要大量的配置和設置樣板代碼,還有 Services 的目的是為了長時間在后臺運營的,而不是處理快速完成的操作,如:做一個網絡請求或者從數據庫加載內容。

讓我們看看 RxJava 是怎么幫我們解決這些問題的。下面這樣的布局有一個按鈕去開始一個長時間運行的操作,并且始終顯示進度條,這樣我們可以確保我們的操作是運行在后臺線程的而不是在主線程。

<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
   xmlns:app="http://schemas.android.com/apk/res-auto"
   android:id="@+id/root_view"
   android:layout_width="match_parent"
   android:layout_height="match_parent"
   android:fitsSystemWindows="true"
   android:orientation="vertical">
   <android.support.v7.widget.Toolbar
       android:id="@+id/toolbar"
       android:layout_width="match_parent"
       android:layout_height="?attr/actionBarSize"
       android:background="?attr/colorPrimary"
       app:popupTheme="@style/AppTheme.PopupOverlay"
       app:theme="@style/ThemeOverlay.AppCompat.Dark.ActionBar" />
   <Button
       android:id="@+id/start_btn"
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       android:layout_gravity="center_horizontal"
       android:text="@string/start_operation_text" />
   <ProgressBar
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       android:layout_gravity="center_horizontal"
       android:indeterminate="true" />
</LinearLayout>  

一旦按鈕被點擊,它會禁用按鈕并開啟長時間運行的操作,并且一旦這個操作完成便會顯示一個 Snackbar,然后按鈕會重新變得可點擊。這里是一個用 AsyncTask 實現我們這個“長期運行的操作”的例子。這個按鈕只是 new 了一個 SampleAsyncTask 并 executes 了它。

public String longRunningOperation() {
   try {
       Thread.sleep(2000);
   } catch (InterruptedException e) {
       // error
   }
   return "Complete!";
}
private class SampleAsyncTask extends AsyncTask {
   @Override
   protected String doInBackground(Void... params) {
       return longRunningOperation();
   }
   @Override
   protected void onPostExecute(String result) {
       Snackbar.make(rootView, result, Snackbar.LENGTH_LONG).show();
       startAsyncTaskButton.setEnabled(true);
   }
}  

現在,我們如何將這個 AsyncTask 用 RxJava 來實現呢?首先,我們需要添加以下內容到我們 app 的 gradle build 文件下:compile 'io.reactivex:rxjava:1.0.14'。然后我們需要創建一個 Observable 來調用我們這個長時間運行的操作。這可以使用 Observable.create() 方法來做到。

final Observable operationObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(longRunningOperation());
       subscriber.onCompleted();
   }
});  

我們創建了 Observable 將會調用 longRunningOperation() 方法,將返回的結果作為參數給 onNext() 方法,然后調用 onCompleted() 來完成 Observable (注:在我們的 Observable 去訂閱之前,我們的操作是不會被調用的)。接下來,當 button 被點擊時,我們需要給我們的 Observable 做訂閱。我添加了一個新的 button 用 RxJava 版本來處理我們的任務。

startRxOperationButton.setOnClickListener(new View.OnClickListener() {
   @Override
   public void onClick(final View v) {
       v.setEnabled(false);
       operationObservable.subscribe(new Subscriber() {
           @Override
           public void onCompleted() {
               v.setEnabled(true);
           }
           @Override
           public void onError(Throwable e) {}
           @Override
           public void onNext(String value) {
               Snackbar.make(rootView, value, Snackbar.LENGTH_LONG).show();
           }
       });
   }
});  

現在當我們建立應用程序時,然后點擊新 button 時,會發生什么?我們的進度顯示會凍結,然后我們 UI 變得反應遲鈍。這是因為我們還沒有定義我們的 Observable 應該在什么線程上,以及我們應該在什么線程去訂閱它。這是 RxJava 的 Schedulers(調度器) 功能。

對于任何 Observable 你可以定義在兩個不同的線程,Observable 會操作在它上面。使用 Observable.observeOn() 可以定義在一個線程上,可以用來監聽和檢查從 Observable 最新發出的 items (Subscriber 的 onNext,onCompleted 和 onError 方法會執行在 observeOn 所指定的線程上),并使用 Observable.subscribeOn() 來定義一個線程,將其運行我們 Observable 的代碼(長時間運行的操作)。

RxJava 默認情況下是單線程的,你會需要利用 observeOn() 和 subscribeOn() 方法為你的應用帶來多線程操作。RxJava 附帶了幾個現成的 Schedulers 給 Observables 使用,如:Schedulers.io() (用于 I/O 操作),Schedulers.computation()(計算工作),和 Schedulers.newThread()(為任務創建的新線程)。然而,從 Android 的角度來看,你可能想知道如何把訂閱代碼執行到主線程。我們可以用 RxAndroid 庫來實現這一目標。

RxAndroid 是一個對 RxJava 的輕量級擴展為了 Android 的主線程提供 Scheduler,也能去創建一個 Scheduler 用于運行在任何給定的 Android Handler 類上。用新的 Schedulers,Observable 創建之前能讓我們將其修改為在后臺線程執行我們的任務,并將我們的結果推到主線程上。

要在 APP 中用 RxAndroid,只要在 gradle build 文件中添加這行代碼就行了:compile 'io.reactivex:rxandroid:1.0.1'。

final Observable operationObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(longRunningOperation());
       subscriber.onCompleted();
   }
})
       .subscribeOn(Schedulers.io()) // subscribeOn the I/O thread
       .observeOn(AndroidSchedulers.mainThread()); // observeOn the UI Thread 

我們修改 Observable 將用 Schedulers.io() 去訂閱,并用 AndroidSchedulers.mainThread() 方法將觀察的結果返回到 UI 線程上 。現在,當我們建立我們的 APP 并點擊我們的 Rx 操作的按鈕,我們可以看到當操作運行時它將不再阻塞 UI 線程。

所有上述的例子利用了 Observable 類來發出我們的結果,當一個操作僅僅只需要發出一個結果然后就完成的情況我們可以有另外一個選擇。RxJava 發布的 1.0.13 版本介紹了 Single 類。Single 類可以用于創建像下面這樣的方法:

Subscription subscription = Single.create(new Single.OnSubscribe() {
           @Override
           public void call(SingleSubscriber singleSubscriber) {
               String value = longRunningOperation();
               singleSubscriber.onSuccess(value);
           }
       })
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Action1() {
           @Override
           public void call(String value) {
               // onSuccess
               Snackbar.make(rootView, value, Snackbar.LENGTH_LONG).show();
           }
       }, new Action1() {
           @Override
           public void call(Throwable throwable) {
               // handle onError
           }
       });  

當給一個 Single 類做訂閱時,只有一個 onSuccess 的 Action 和 onError 的 action。Single 類有不同于 Observable 的操作符,有幾個操作符具有將 Single 轉換到 Observable 的機制。例如:用 Single.mergeWith() 操作符,兩個或更多同類型的 Singles 可以合并到一起去創建一個 Observable,發出每個 Single 的結果給一個 Observable。

防止內存泄露

對于 AsyncTasks 所提到的缺點是,如果對于涉及了 Activity 或 Fragment 的處理不仔細的話,AsyncTasks 可能會造成內存泄露。不幸的是,使用 RxJava 不會魔術般的緩解內存泄露危機,但是防止內存泄露是很簡單的。

如果你一直在關注代碼,你可能會注意到你調用的 Observable.subscribe() 的返回值是一個 Subscription 對象。Subscription 類只有兩個方法,unsubscribe() 和 isUnsubscribed()。為了防止可能的內存泄露,在你的 Activity 或 Fragment 的 onDestroy 里,用 Subscription.isUnsubscribed() 檢查你的 Subscription 是否是 unsubscribed。如果調用了 Subscription.unsubscribe() ,Unsubscribing將會對 items 停止通知給你的 Subscriber,并允許垃圾回收機制釋放對象,防止任何 RxJava 造成內存泄露。如果你正在處理多個 Observables 和 Subscribers,所有的 Subscription 對象可以添加到 CompositeSubscription,然后可以使用 CompositeSubscription.unsubscribe() 方法在同一時間進行退訂(unsubscribed)。

寫在最后

RxJava 在 Android 生態系統中提供非常棒的多線程選項。讓我們能輕松的去后臺線程做操作,然后將結果推到 UI 線程上。這對于任何 Android 應用來說都是非常需要的功能,能夠運用 RxJava 的眾多操作符來處理任何操作的結果僅僅是為了創造更多的附加值。然而 RxJava 要求我們對這個庫有更好的了解,充分利用其功能,所花費在這個庫上的時間就能讓你帶來更大的回報。

這篇博客并未涉及 RxJava 的更進一步的主題:熱觀察 vs 冷觀察、處理 backpressure、 Rx 的 Subject 類。用 RxJava 替代 AsyncTask 所涉及的示例代碼可以在 Github 上找到。

番外:Retrolambda

Java 8 引入了 Lambda 表達式,遺憾的是 Android 并不支持 Java 8,所以我們不能在 RxJava 中利用這一特性。幸運的是,有一個名為 Retrolambda 的庫反向移植了 Lambda 表達式到 Java 的早期版本。還有提供有一個 Retrolambda 的 gradle 插件 ,能讓我們在 Android 應用中去使用 Lambda。

對于 Lambda,可以簡化 Observable 和 Subscriber 的代碼,如:

final Observable operationObservable = Observable.create(
       (Subscriber subscriber) -> {
           subscriber.onNext(longRunningOperation());
           subscriber.onCompleted();
       })
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread());
startRxOperationButton = (Button) findViewById(R.id.start_rxjava_operation_btn);
startRxOperationButton.setOnClickListener(v -> {
   v.setEnabled(false);
   operationObservable.subscribe(
           value -> Snackbar.make(rootView, value, Snackbar.LENGTH_LONG).show(),
           error -> Log.e("TAG", "Error: " + error.getMessage()),
           () -> v.setEnabled(true));
});  

Lambda 表達式為 RxJava 減少了很多代碼,我會強烈建議你用上 Retrolambda 的。它甚至比 RxJava 還要好用(setOnClickListener方法同樣可以使用 Retrolambda)。

原文  http://mrfu.me/android/2015/11/11/Getting_Started_with_RxJava_and

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!