RxJava 教程第四部分:并發 之線程調度

MireyaADMF 8年前發布 | 12K 次閱讀 RxJava 并發 Java開發

由于 Rx 目標是用在異步系統上并且 Rx 支持多線程處理,所以很多 Rx 開發者認為默認情況下 Rx 就是多線程的。 其實實際情況不是這樣的, Rx 默認是單線程的

除非你明確的指定線程,否則所有 onNext/onError/onCompleted 以及各個操作函數的調用都是在同一個線程中完成的。例如下面的示例:

final BehaviorSubject<Integer> subject = BehaviorSubject.create();
subject.subscribe(i -> {
    System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
 
int[] i = {1}; // naughty side-effects for examples only ;)
Runnable r = () -> {
    synchronized(i) {
        System.out.println("onNext(" + i[0] + ") on " + Thread.currentThread().getId());
        subject.onNext(i[0]++);
    }
};
 
r.run(); // Execute on main thread
new Thread(r).start();
new Thread(r).start();
 

結果:

onNext(1) on 1
Received 1 on 1
onNext(2) on 11
Received 2 on 11
onNext(3) on 12
Received 3 on 12
 

上面在三個線程中分別調用 subject 的onNext 函數。和 Runnable 中的線程是同一個線程。不管用多少個操作函數串聯調用,結果都是同一個線程。

subscribeOn 和 observeOn

subscribeOn 和 observeOn 分別用來控制 subscription 的調用線程和 接受事件通知(Observer 的 onNext/onError/onCompleted 函數)的線程。

public final Observable<T> observeOn(Schedulerscheduler)
public final Observable<T> subscribeOn(Schedulerscheduler)
 

在Rx 中你并不直接和 線程 打交道,而是通過 Scheduler 來處理多線程。

subscribeOn

subscribeOn 用來指定 Observable.create 中的代碼在那個 Scheduler 中執行。即使你沒有調用 create 函數,但是內部也有一個 create 實現。例如:

System.out.println("Main: " + Thread.currentThread().getId());
 
Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    //.subscribeOn(Schedulers.newThread())
    .subscribe(i -> {
        System.out.println("Received " + i + " on " + Thread.currentThread().getId());
    });
 
System.out.println("Finished main: " + Thread.currentThread().getId());
 

結果:

Main: 1
Createdon 1
Received 1 on 1
Received 2 on 1
Finishedmain: 1
 

可以看到上面的代碼是在同一個線程中執行,并且是按循序執行的。subscribe 執行完后(包括create 函數里面的 Lambda 表達式的代碼)才繼續執行后面的代碼。

如果你把上面的注釋掉的代碼 .subscribeOn(Schedulers.newThread()) 啟用,這結果是這樣的:

Main: 1
Finishedmain: 1
Createdon 11
Received 1 on 11
Received 2 on 11
 

這樣 create 里面的 Lambda 表達式代碼將會在 Schedulers.newThread() 返回的線程中執行。subscribe 不再是阻塞的了。后面的代碼可以立即執行,而不用等待 subscribe 返回。

有些 Observable 內部會使用它們自己創建的線程。例如 Observable.interval 就是異步的。這種情況下,無需指定新的線程。

System.out.println("Main: " + Thread.currentThread().getId());
 
Observable.interval(100, TimeUnit.MILLISECONDS)
    .subscribe(i -> {
        System.out.println("Received " + i + " on " + Thread.currentThread().getId());
    });
 
System.out.println("Finished main: " + Thread.currentThread().getId());
 

結果:

Main: 1
Finishedmain: 1
Received 0 on 11
Received 1 on 11
Received 2 on 11
 

observeOn

observeOn 控制數據流的另外一端。你的 observer 如何收到事件。也就是在那個線程中回調 observer 的 onNext/onError/onCompleted 函數。

Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    .observeOn(Schedulers.newThread())
    .subscribe(i ->
        System.out.println("Received " + i + " on " + Thread.currentThread().getId()));
 

結果:

Createdon 1
Received 1 on 13
Received 2 on 13
 

observeOn 只影響調用該函數以后的操作函數。你可以認為 observeOn 只是攔截了數據流并且對后續的操作有作用。例如:

Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    .doOnNext(i -> 
        System.out.println("Before " + i + " on " + Thread.currentThread().getId()))
    .observeOn(Schedulers.newThread())
    .doOnNext(i -> 
        System.out.println("After " + i + " on " + Thread.currentThread().getId()))
    .subscribe();
 

結果:

Createdon 1
Before 1 on 1
Before 2 on 1
After 1 on 13
After 2 on 13
 

可以看到在遇到 observeOn 之前,所有的操作發生在一個線程,之后在另外一個線程。這樣可以在 Rx 數據流中不同地方設置不同的線程。

如果你知道數據流處理在那些情況需要很長時間,則可以通過這個操作來避免阻塞生產者線程。 比如在 Android 開發過程中的 UI 線程,如果在該線程中讀取文件,可能會導致 UI 卡死(ANR)無響應,通過該函數可以指定讀取文件在另外一個線程中執行。

unsubscribeOn

有些 Observable 會依賴一些資源,當該 Observable 完成后釋放這些資源。如果釋放資源比較耗時的話,可以通過 unsubscribeOn 來指定 釋放資源代碼執行的線程。

Observable<Object> source = Observable.using(
    () -> {
        System.out.println("Subscribed on " + Thread.currentThread().getId());
        return Arrays.asList(1,2);
    },
    (ints) -> {
        System.out.println("Producing on " + Thread.currentThread().getId());
        return Observable.from(ints);
    },
    (ints) -> {
        System.out.println("Unubscribed on " + Thread.currentThread().getId());
    }
);
 
source
    .unsubscribeOn(Schedulers.newThread())
    .subscribe(System.out::println);
 

結果:

Subscribedon 1
Producingon 1
1
2
Unubscribedon 11
 

Schedulers

observeOn 和 subscribeOn 的參數為一個 Scheduler 對象。Scheduler 是用來協調任務執行的。 RxJava 包含了一些常用的 Scheduler,你也可以自定義 Scheduler。 通過調用 Schedulers 的工廠函數來獲取標準的預定義的 Scheduler。

RxJava 內置的 Scheduler 有:

– immediate 同步執行

– trampoline 把任務放到當前線程的隊列中,等當前任務執行完了,再繼續執行隊列中的任務

– newThread 對于每個任務創建一個新的線程去執行

– computation 計算線程,用于需要大量 CPU 計算的任務

– io 用于執行 io 操作的任務

– test 用于測試和調試

當前 computation 和 io 的實現是類似的,他們兩個主要用來確保調用的場景,相當于文檔說明,來表明你當前的任務是何種類型的。

大部分的 Rx 操作函數內部都使用了schedulers 。并且大部分的 Observable 操作函數也都有一個使用 Scheduler 參數的重載函數。通過重載函數可以指定該操作函數執行的線程。

scheduler 的高級特性

Rx scheduler 的使用場景并沒有限定在 Rx 中,也可以在普通 Java 代碼中使用。

執行一個任務

Scheduler 有個 createWorker 函數,用來創建一個可以執行的任務( Scheduler.Worker )。然后可以調度該任務:

Scheduler.Workerworker = scheduler.createWorker();
worker.schedule(
    () -> System.out.println("Action"));
 

上面的任務被分配到其指定的線程中了。

還可以重復執行任務,或者只執行一次,也可以推遲任務執行:

Subscriptionschedule(
    Action0action,
    long delayTime,
    java.util.concurrent.TimeUnitunit)
SubscriptionschedulePeriodically(
    Action0action,
    long initialDelay,
    long period,
    java.util.concurrent.TimeUnitunit)
 
Schedulerscheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Workerworker = scheduler.createWorker();
worker.schedule(
    () -> System.out.println(System.currentTimeMillis()-start),
    5, TimeUnit.SECONDS);
worker.schedule(
    () -> System.out.println(System.currentTimeMillis()-start),
    5, TimeUnit.SECONDS);
 

結果:

5033
5035
 

上面示例中可以看到,推遲執行是從調度開始的時候計算時間的。

取消任務

Scheduler.Worker 繼承至 Subscription。調用 unsubscribe 函數可以取消隊列中的任務:

Schedulerscheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Workerworker = scheduler.createWorker();
worker.schedule(
    () -> {
        System.out.println(System.currentTimeMillis()-start);
        worker.unsubscribe();
    },
    5, TimeUnit.SECONDS);
worker.schedule(
    () -> System.out.println(System.currentTimeMillis()-start),
    5, TimeUnit.SECONDS);
 

結果:

5032
 

第一個任務中調用了 unsubscribe,這樣第二個任務被取消了。下面一個示例演示任務沒有執行完,被取消的情況,會拋出一個 InterruptedException 異常:

Schedulerscheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Workerworker = scheduler.createWorker();
worker.schedule(() -> {
    try {
        Thread.sleep(2000);
        System.out.println("Action completed");
    } catch (InterruptedException e) {
        System.out.println("Action interrupted");
    }
});
Thread.sleep(500);
worker.unsubscribe();
 

結果:

Actioninterrupted
 

schedule 返回的是一個 Subscription 對象,可以在該對象上調用取消操作,這樣可以只取消這一個任務,而不是取消所有任務。

RxJava 中現有的 scheduler

ImmediateScheduler

ImmediateScheduler 并沒有做任何線程調度。只是同步的執行任務。嵌套調用會導致任務被遞歸執行:

Schedulerscheduler = Schedulers.immediate();
Scheduler.Workerworker = scheduler.createWorker();
worker.schedule(() -> {
    System.out.println("Start");
    worker.schedule(() -> System.out.println("Inner"));
    System.out.println("End");
});
 

結果:

Start
Inner
End
 

TrampolineScheduler

TrampolineScheduler 也是同步執行,但是不嵌套任務。而是把后來的任務添加到任務隊列中,等前面的任務執行完了 再執行后面的。

Schedulerscheduler = Schedulers.trampoline();
Scheduler.Workerworker = scheduler.createWorker();
worker.schedule(() -> {
    System.out.println("Start");
    worker.schedule(() -> System.out.println("Inner"));
    System.out.println("End");
});
 

結果:

Start
End
Inner
 

TrampolineScheduler 把任務安排到第一次執行任務的那個線程中執行。這樣,第一次調用 schedule 的操作是阻塞的,直到隊列執行完。后續的任務,會在這個線程中一個一個的執行,并且后續的調用不會阻塞。

NewThreadScheduler

NewThreadScheduler 給每個任務創建一個新的線程。

定義一個打印線程信息的輔助函數:

public static void printThread(String message) {
    System.out.println(message + " on " + Thread.currentThread().getId());
}
 

示例:

printThread("Main");
Schedulerscheduler = Schedulers.newThread();
Scheduler.Workerworker = scheduler.createWorker();
worker.schedule(() -> {
    printThread("Start");
    worker.schedule(() -> printThread("Inner"));
    printThread("End");
});
Thread.sleep(500);
worker.schedule(() -> printThread("Again"));
 

結果:

Mainon 1
Starton 11
End on 11
Inneron 11
Againon 11
 

來自: http://blog.chengyunfeng.com/?p=978 

 本文由用戶 MireyaADMF 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!