理解RxJava的線程模型

MicEdinburg 7年前發布 | 10K 次閱讀 線程 RxJava

ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx,最初是LINQ的一個擴展,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步數據流,Rx庫支持.NET、JavaScript和C++,Rx近幾年越來越流行了,現在已經支持幾乎全部的流行編程語言了,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社區網站是 reactivex.io

Netflix參考微軟的Reactive Extensions創建了Java的實現RxJava,主要是為了簡化服務器端的并發。2013年二月份,Ben Christensen 和 Jafar Husain發在Netflix技術博客的一篇文章第一次向世界展示了RxJava。

RxJava也在Android開發中得到廣泛的應用。

ReactiveX

An API for asynchronous programming with observable streams.

A combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

雖然RxJava是為異步編程實現的庫,但是如果不清楚它的使用,或者錯誤地使用了它的線程調度,反而不能很好的利用它的異步編程提到系統的處理速度。本文通過實例演示錯誤的RxJava的使用,解釋RxJava的線程調度模型,主要介紹 Scheduler 、 observeOn 和 subscribeOn 的使用。

本文中的例子以并發發送http request請求為基礎,通過性能檢驗RxJava的線程調度。

第一個例子,性能超好?

我們首先看第一個例子:

public static void testRxJavaWithoutBlocking(int count) throws Exception {
    CountDownLatch finishedLatch = new CountDownLatch(1);
    long t = System.nanoTime();
    Observable.range(0, count).map(i -> {
        //System.out.println("A:" + Thread.currentThread().getName());
        return 200;
    }).subscribe(statusCode -> {
        //System.out.println("B:" + Thread.currentThread().getName());
    }, error -> {
    }, () -> {
        finishedLatch.countDown();
    });
    finishedLatch.await();
    t = (System.nanoTime() - t) / 1000000; //ms
    System.out.println("RxJavaWithoutBlocking TPS: " + count * 1000 / t);
}

這個例子是一個基本的RxJava的使用,利用Range創建一個Observable, subscriber處理接收的數據。因為整個邏輯沒有阻塞,程序運行起來很快,

輸出結果為:

RxJavaWithoutBlocking TPS: 7692307

2 加上業務的模擬,性能超差

上面的例子是一個理想化的程序,沒雨任何阻塞。我們模擬一下實際的應用,加上業務處理。

業務邏輯是發送一個http的請求,httpserver是一個模擬器,針對每個請求有30毫秒的延遲。subscriber統計請求結果:

public static void testRxJavaWithBlocking(int count) throws Exception {
        URL url = new URL("http://127.0.0.1:8999/");
        CountDownLatch finishedLatch = new CountDownLatch(1);
        long t = System.nanoTime();
        Observable.range(0, count).map(i -> {
            try {
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                int responseCode = conn.getResponseCode();
                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    //response.append(inputLine);
                }
                in.close();
                return responseCode;
            } catch (Exception ex) {
                return -1;
            }
        }).subscribe(statusCode -> {
        }, error -> {
        }, () -> {
            finishedLatch.countDown();
        });
        finishedLatch.await();
        t = (System.nanoTime() - t) / 1000000; //ms
        System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);
    }

運行結果如下:

RxJavaWithBlocking TPS: 29。

@#¥%%……&!

性能怎么突降呢,第一個例子看起來性能超好啊,http server只增加了一個30毫秒的延遲,導致這個方法每秒只能處理29個請求。

如果我們估算一下, 29*30= 870 毫秒,大約1秒,正好和單個線程發送處理所有的請求的TPS差不多。

后面我們也會看到,實際的確是一個線程處理的,你可以在代碼中加入

3 加上調度器,不起作用?

如果你對 subscribeOn 和 observeOn 方法有些印象的話,可能會嘗試使用調度器去解決:

public static void testRxJavaWithBlocking(int count) throws Exception {
        URL url = new URL("http://127.0.0.1:8999/");
        CountDownLatch finishedLatch = new CountDownLatch(1);
        long t = System.nanoTime();
        Observable.range(0, count).map(i -> {
            try {
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                int responseCode = conn.getResponseCode();
                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    //response.append(inputLine);
                }
                in.close();
                return responseCode;
            } catch (Exception ex) {
                return -1;
            }
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(statusCode -> {
        }, error -> {
        }, () -> {
            finishedLatch.countDown();
        });
        finishedLatch.await();
        t = (System.nanoTime() - t) / 1000000; //ms
        System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);
    }

加上 .subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()) 看一下性能:

RxJavaWithBlocking TPS: 30

性能沒有改觀,是時候了解一下RxJava線程調度的問題了。

4 RxJava的線程模型

首先,依照 Observable Contract ,  onNext 是順序執行的,不會同時由多個線程并發執行。

圖片來自 http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

默認情況下,它是在調用subscribe方法的那個線程中執行的。如第一個例子和第二個例子,Rx的操作和消息接收處理都是在同一個線程中執行的。一旦由阻塞,比如第二個例子,久會導致這個線程被阻塞,吞吐量下降。

圖片來自 https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2

但是 subscribeOn 可以改變Observable的運行線程。

圖片來自 https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2

上圖中可以看到,如果你使用了 subscribeOn 方法,則Rx的運行將會切換到另外的線程上,而不是默認的調用線程。

需要注意的是,如果在Observable鏈中調用了多個 subscribeOn 方法,無論調用點在哪里,Observable鏈只會使用第一個 subscribeOn 指定的調度器,正所謂”一見傾情”。

但是 onNext 還是順序執行的,所以第二個例子的性能依然低下。

observeOn 可以中途改變Observable鏈的線程。前面說了, subscribeOn 方法改變的源Observable的整個的運行線程,要想中途切換線程,就需要 observeOn 方法。

圖片來自 http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

官方的一個簡略晦澀的解釋如下:

The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

一圖勝千言:

圖片來自 http://reactivex.io

注意箭頭的顏色和橫軸的顏色,不同的顏色代表不同的線程。

5 Schedulers

上面我們了解了RxJava可以使用 subscribeOn 和 observeOn 可以改變和切換線程,以及 onNext 是順序執行的,不是并發執行,至多也就切換到另外一個線程,如果它中間的操作是阻塞的,久會影響整個Rx的執行。

Rx是通過調度器來選擇哪個線程執行的,RxJava內置了幾種調度器,分別為不同的case提供線程:

  • io()  : 這個調度器時用于I/O操作, 它可以增長或縮減來確定線程池的大小它是使用CachedThreadScheduler來實現的。需要注意的是,它的線程池是無限制的,如果你使用了大量的線程的話,可能會導致OutOfMemory等資源用盡的異常。
  • computation()  : 這個是計算工作默認的調度器,它與I/O操作無關。它也是許多RxJava方法的默認調度器:buffer(),debounce(),delay(),interval(),sample(),skip()。

因為這些方法內部已經調用的調度器,所以你再調用 subscribeOn 是無效的,比如下面的例子總是使用 computation 調度器的線程。

Observable.just(1,2,3)
                .delay(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.newThread())
                .map(i -> {
                    System.out.println("map: " + Thread.currentThread().getName());
                    return i;
                })
                .subscribe(i -> {});
  • immediate()  :這個調度器允許你立即在當前線程執行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默認的調度器。
  • newThread()  :創建一個新的線程只從。
  • trampoline()  :為當前線程建立一個隊列,將當前任務加入到隊列中依次執行。

同時, Schedulers 還提供了 from 靜態方法,用戶可以定制線程池:

ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
Schedulers.from(es)

6 改造,異步執行

現在,我們已經了解了RxJava的線程運行,以及相關的調度器。可以看到上面的例子還是順序阻塞執行的,即使是切換到另外的線程上,依然是順序阻塞執行,顯示它的吞吐率非常非常的低。下一步我們就要改造這個例子,讓它能異步的執行。

下面是一種改造方案,我先把代碼貼出來,再解釋:

public static void testRxJavaWithFlatMap(int count) throws Exception {
    ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
    URL url = new URL("http://127.0.0.1:8999/");
    CountDownLatch finishedLatch = new CountDownLatch(1);
    long t = System.nanoTime();
    Observable.range(0, count).subscribeOn(Schedulers.io()).flatMap(i -> {
                //System.out.println("A: " + Thread.currentThread().getName());
                return Observable.just(i).subscribeOn(Schedulers.from(es)).map(v -> {
                            //System.out.println("B: " + Thread.currentThread().getName());
                            try {
                                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                                conn.setRequestMethod("GET");
                                int responseCode = conn.getResponseCode();
                                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                                String inputLine;
                                while ((inputLine = in.readLine()) != null) {
                                    //response.append(inputLine);
                                }
                                in.close();
                                return responseCode;
                            } catch (Exception ex) {
                                return -1;
                            }
                        }
                );
            }
    ).observeOn(Schedulers.computation()).subscribe(statusCode -> {
        //System.out.println("C: " + Thread.currentThread().getName());
    }, error -> {
    }, () -> {
        finishedLatch.countDown();
    });
    finishedLatch.await();
    t = (System.nanoTime() - t) / 1000000; //ms
    System.out.println("RxJavaWithFlatMap TPS: " + count * 1000 / t);
    es.shutdownNow();
}

通過 flatmap 可以將源Observable的元素項轉成n個Observable,生成的每個Observable可以使用線程池并發的執行,同時flatmap還會將這n個Observable merge成一個Observable。你可以將其中的注釋打開,看看線程的執行情況。

性能還不錯:

RxJavaWithFlatMap TPS: 3906

FlatMap— transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

圖片來自 http://reactivex.io

7 另一種解決方案

我們已經清楚了要并行執行提高吞吐率的解決辦法就是創建多個Observable并且并發執行。基于這種解決方案,我們還可以有其它的解決方案。

上一方案中利用flatmap創建多個Observable,針對我們的例子,我們何不直接創建多個Observable呢?

public static void testRxJavaWithParallel(int count) throws Exception {
    ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
    URL url = new URL("http://127.0.0.1:8999/");
    CountDownLatch finishedLatch = new CountDownLatch(count);
    long t = System.nanoTime();
    for (int k = 0; k < count; k++) {
        Observable.just(k).map(i -> {
            //System.out.println("A: " + Thread.currentThread().getName());
            try {
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                int responseCode = conn.getResponseCode();
                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    //response.append(inputLine);
                }
                in.close();
                return responseCode;
            } catch (Exception ex) {
                return -1;
            }
        }).subscribeOn(Schedulers.from(es)).observeOn(Schedulers.computation()).subscribe(statusCode -> {
        }, error -> {
        }, () -> {
            finishedLatch.countDown();
        });
    }
    finishedLatch.await();
    t = (System.nanoTime() - t) / 1000000; //ms
    System.out.println("RxJavaWithParallel TPS: " + count * 1000 / t);
    es.shutdownNow();
}

性能更好一點:

RxJavaWithParallel2 TPS: 4716。

這個例子沒有使用 Schedulers.io() 作為它的調度器,這是因為如果在大并發的情況下,可能會出現創建過多的線程導致資源不錯,所以我們限定使用200個線程。

8 總結

  • subscribeOn()  改變的Observable運行(operate)使用的調度器,多次調用無效。
  • observeOn()  改變Observable發送notifications的調度器,會影響后續的操作,可以多次調用
  • 默認情況下, 操作鏈使用的線程是調用 subscribe() 的線程
  • Schedulers 提供了多個調度器,可以并行運行多個Observable
  • 使用RxJava可以實現異步編程,但是依然要小心線程阻塞。而且由于這種異步的編程,調試代碼可能更加的困難

9 參考文檔

  1. http://reactivex.io/documentation/contract.html
  2. http://reactivex.io/documentation/operators/subscribeon.html   中文翻譯
  3. http://reactivex.io/documentation/operators/observeon.html   中文翻譯
  4. http://reactivex.io/documentation/scheduler.html
  5. http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html
  6. http://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html
  7. https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2   中文翻譯
  8. https://github.com/mcxiaoke/RxDocs

 

來自:http://colobu.com/2016/07/25/understanding-rxjava-thread-model/

 

 本文由用戶 MicEdinburg 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!