RxJava 中的多線程

dygzlmx 7年前發布 | 17K 次閱讀 RxJava 多線程

xJava 中的多線程

大多數情況下,我寫的 Android 代碼都是可以流暢運行的。直到上幾周編寫一個需要讀取和分析大型文件的 app 之前,我從未關心過 app 運行速度的問題。

盡管我期望用戶明白文件越大,耗時越長的道理,有時候他們仍會放棄我的應用。他們可能認為應用卡住了,也可能是因為他們就不想等那么久。所以如果我能把時間縮短至少一半的話,一定會大有裨益的。

第一次嘗試

因為我所有后臺任務都用 RxJava 重寫了,所以繼續用 RxJava 來解決這個問題也是自然而然的。尤其是我還有一些如下所示的代碼:

List<String> dataList;
//這里是數據列表

List<DataModel> result = new ArrayList<>(); for (String data : dataList) { result.add(DataParser.createData(data)); }</code></pre>

所以我只是想把循環的每個操作放到一個后臺線程中。如下所示:

List<String> dataList;
//這里是數據列表

List<Observable<DataModel>> tasks = new ArrayList<>();

for (String data : dataList) { tasks.add(Observable.just(data).subscribeOn(Schedulers.io()).map(s -> { // 返回一個 DataModel 對象 return DataParser.createData(s); })); }

List<DataModel> result = new ArrayList<>();

// 等待運行結束并收集結果 for (DataModel dataModel : Observable.merge(tasks).toBlocking().toIterable()) { result.add(dataModel); }</code></pre>

的確起作用了,時間減少了近一半。但也導致大量垃圾回收(GC),這使得加載時的 UI 又卡又慢。為了搞清楚問題的原因,我加了一句 log 打印如下信息 Thread.currentThread().getName() 。 這樣我就搞清楚了,我在處理每一段數據時都新建了線程。正如結果所示,創建上千個線程并不是什么好主意。

第二次嘗試

我已經完成了加速數據處理的目標,但運行起來并不那么流暢。我想知道如果不觸發這么多 GC 的話還能不能跑得再快點。所以我自己寫了一個線程池并指定了最大線程數來供 RxJava 調用,省的每次處理數據都要創建新線程:

List<String> dataList;
//這里是數據列表

List<Observable<DataModel>> tasks = new ArrayList<>();

// 取得能夠使用的最大線程數 int threadCount = Runtime.getRuntime().availableProcessors(); ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount); Scheduler scheduler = Schedulers.from(threadPoolExecutor);

for (String data : dataList) { tasks.add(Observable.just(data).subscribeOn(scheduler).map(s -> { // 返回一個 DataModel 對象 return DataParser.createData(s); })); }

List<DataModel> result = new ArrayList<>();

// 等待運行結束并收集結果 for (DataModel dataModel : Observable.merge(tasks).toBlocking().toIterable()) { result.add(dataModel); }</code></pre>

對于單個數據都很大的數據集來說,這樣減少了約 10% 的數據處理時間。然而,對于單個數據都很小的數據集就減少了約 30% 的時間。同時也減少了 GC 的調用次數,但 GC 還是太頻繁。

第三次嘗試

我有一個新想法——如果性能的瓶頸是頻繁的切換和調用線程呢?為了克服這個問題,我可以將數據集根據線程的數目平均分成總數量相等的子集合,每個子合集丟給一個線程處理。這樣雖然是并發運行,但是每個線程被調用的次數將被降低到最小。我嘗試使用 這里 的解決方法來實現我的想法:

List<String> dataList;
//這里是數據列表

// 取得能夠使用的最大線程數 int threadCount = Runtime.getRuntime().availableProcessors(); ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount); Scheduler scheduler = Schedulers.from(threadPoolExecutor);

AtomicInteger groupIndex = new AtomicInteger();

// 以線程數量為依據分組數據,將每組數據放到它們自己的線程中 Iterable<List<DataModel>> resultGroups = Observable.from(dataList).groupBy(k -> groupIndex.getAndIncrement() % threadCount) .flatMap(group -> group.observeOn(scheduler).toList().map(sublist -> { List<DataModel> dataModels = new ArrayList<>(); for (String data : sublist) { dataModels.add(DataParser.createData(data)); } return dataModels; })).toBlocking().toIterable();

List<DataModel> result = new ArrayList<>();

// 等待運行結束并收集結果 for (List<DataModel> dataModels : resultGroups) { result.addAll(dataModels); }</code></pre>

上文中我提到用兩類數據集進行測試,一類的數據本身是大文件,但是數據集里包含的數據個數很少;另一類數據集里的每一個數據并不是很大,但是包含數據的總量很多。當我再次測試時,第一組數據幾乎沒差別,而第二組改變相當大。之前幾乎要 20秒,現在只需 5秒。

第二類數據集運行時間改進了如此大的原因,是因為每個線程不再處理一個數據(而是處理一個從總體數據集里拆分下來的小數據集)。之前每一個數據,都需要調用一個線程來處理。現在我減少了調用線程的次數,從而提升了性能。

整理

上面的代碼要執行并發還有一些地方需要修改,所以我整理了代碼并放到工具類中,使其更具有通用性。

/**

  • 將數據集拆分成子集并指派給規定數量的線程,并傳入回調來進行具體業務邏輯處理。
  • <b>T</b> 是要被處理的數據類型,<b>U</b> 是返回的數據類型 */ public static <T, U> Iterable<U> parseDataInParallel(List<T> data, Func1<List<T>, U> worker) { int threadCount = Runtime.getRuntime().availableProcessors(); ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount); Scheduler scheduler = Schedulers.from(threadPoolExecutor);

    AtomicInteger groupIndex = new AtomicInteger();

    return Observable.from(data).groupBy(k -> groupIndex.getAndIncrement() % threadCount)

         .flatMap(group -> group.observeOn(scheduler).toList().map(worker)).toBlocking().toIterable();

}

//EXAMPLE USAGE Iterable<List<DataModel>> resultGroups = Util.parseDataInParallel(dataList, (sublist) -> { List<DataModel> dataModels = new ArrayList<>(); for (String data : sublist) { dataModels.add(DataParser.createData(data)); } return dataModels; });

List<DataModel> results = new ArrayList<>(); for (List<DataModel> dataModels : resultGroups) { results.addAll(dataModels); }</code></pre>

這里 T 是被處理的數據類型,樣例中是 DataModel 。傳入待處理的 List<T> 并期望結果是 U 。在我的樣例中 U 是 List<DataModel> ,但它可以是任何東西,并不一定是一個 list。傳入的回調函數負責數據子列表具體的業務處理并返回結果。

可以再快點么?

事實上影響運行速度的因素有許多。比如線程管理方式,線程數,設備等。大多數因素我無法控制,但總有一些是我沒有考慮到的。

如果每個數據大小不相等會怎么樣?舉個例子,如果有 4 個線程,每個被指派給第 4 線程的數據大小是被指派給其他線程的十倍會怎么樣?這時第四個線程的耗時就是其他線程的大約 10 倍。這種情況下使用多線程就不會減少多少時間。我的第二次嘗試基本解決了這個問題,因為線程只在需要時才初始化。但這個方法太慢了。

我也試過改變數據分組方式。作為隨意分配的取代,我可以跟蹤每一組數據的總量,然后將數據分配給最少的那組。這樣每個線程的工作量就接近平均了。倒霉的是,測試之后發現這樣做增加的時間遠大于它節省的時間。

數據被分配的大小越平均,處理速度就越快。但大多數情況下,隨機分配看起來更快些。理想情況下是每個線程一有空就分配任務,同時執行分配所消耗的資源也少,這是最高效的。但我找不到一個足夠高效的可以減少分配瓶頸的方法。

總結

所以如果你想用多線程,這是我的建議。如果你有什么好想法,請務必告訴我。得到一個最優解(如果有的話)總是很難的。以及, 用多線程并不意味著 必須 用多線程。

 

來自:https://juejin.im/post/58ff6259da2f60005dd81459

 

 本文由用戶 dygzlmx 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!