RxJava 中的多線程
xJava 中的多線程
大多數情況下,我寫的 Android 代碼都是可以流暢運行的。直到上幾周編寫一個需要讀取和分析大型文件的 app 之前,我從未關心過 app 運行速度的問題。
盡管我期望用戶明白文件越大,耗時越長的道理,有時候他們仍會放棄我的應用。他們可能認為應用卡住了,也可能是因為他們就不想等那么久。所以如果我能把時間縮短至少一半的話,一定會大有裨益的。
第一次嘗試
因為我所有后臺任務都用 RxJava 重寫了,所以繼續用 RxJava 來解決這個問題也是自然而然的。尤其是我還有一些如下所示的代碼:
List<String> dataList;
//這里是數據列表
List<DataModel> result = new ArrayList<>();
for (String data : dataList) {
result.add(DataParser.createData(data));
}</code></pre>
所以我只是想把循環的每個操作放到一個后臺線程中。如下所示:
List<String> dataList;
//這里是數據列表
List<Observable<DataModel>> tasks = new ArrayList<>();
for (String data : dataList) {
tasks.add(Observable.just(data).subscribeOn(Schedulers.io()).map(s -> {
// 返回一個 DataModel 對象
return DataParser.createData(s);
}));
}
List<DataModel> result = new ArrayList<>();
// 等待運行結束并收集結果
for (DataModel dataModel : Observable.merge(tasks).toBlocking().toIterable()) {
result.add(dataModel);
}</code></pre>
的確起作用了,時間減少了近一半。但也導致大量垃圾回收(GC),這使得加載時的 UI 又卡又慢。為了搞清楚問題的原因,我加了一句 log 打印如下信息 Thread.currentThread().getName() 。 這樣我就搞清楚了,我在處理每一段數據時都新建了線程。正如結果所示,創建上千個線程并不是什么好主意。
第二次嘗試
我已經完成了加速數據處理的目標,但運行起來并不那么流暢。我想知道如果不觸發這么多 GC 的話還能不能跑得再快點。所以我自己寫了一個線程池并指定了最大線程數來供 RxJava 調用,省的每次處理數據都要創建新線程:
List<String> dataList;
//這里是數據列表
List<Observable<DataModel>> tasks = new ArrayList<>();
// 取得能夠使用的最大線程數
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(threadPoolExecutor);
for (String data : dataList) {
tasks.add(Observable.just(data).subscribeOn(scheduler).map(s -> {
// 返回一個 DataModel 對象
return DataParser.createData(s);
}));
}
List<DataModel> result = new ArrayList<>();
// 等待運行結束并收集結果
for (DataModel dataModel : Observable.merge(tasks).toBlocking().toIterable()) {
result.add(dataModel);
}</code></pre>
對于單個數據都很大的數據集來說,這樣減少了約 10% 的數據處理時間。然而,對于單個數據都很小的數據集就減少了約 30% 的時間。同時也減少了 GC 的調用次數,但 GC 還是太頻繁。
第三次嘗試
我有一個新想法——如果性能的瓶頸是頻繁的切換和調用線程呢?為了克服這個問題,我可以將數據集根據線程的數目平均分成總數量相等的子集合,每個子合集丟給一個線程處理。這樣雖然是并發運行,但是每個線程被調用的次數將被降低到最小。我嘗試使用 這里 的解決方法來實現我的想法:
List<String> dataList;
//這里是數據列表
// 取得能夠使用的最大線程數
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(threadPoolExecutor);
AtomicInteger groupIndex = new AtomicInteger();
// 以線程數量為依據分組數據,將每組數據放到它們自己的線程中
Iterable<List<DataModel>> resultGroups =
Observable.from(dataList).groupBy(k -> groupIndex.getAndIncrement() % threadCount)
.flatMap(group -> group.observeOn(scheduler).toList().map(sublist -> {
List<DataModel> dataModels = new ArrayList<>();
for (String data : sublist) {
dataModels.add(DataParser.createData(data));
}
return dataModels;
})).toBlocking().toIterable();
List<DataModel> result = new ArrayList<>();
// 等待運行結束并收集結果
for (List<DataModel> dataModels : resultGroups) {
result.addAll(dataModels);
}</code></pre>
上文中我提到用兩類數據集進行測試,一類的數據本身是大文件,但是數據集里包含的數據個數很少;另一類數據集里的每一個數據并不是很大,但是包含數據的總量很多。當我再次測試時,第一組數據幾乎沒差別,而第二組改變相當大。之前幾乎要 20秒,現在只需 5秒。
第二類數據集運行時間改進了如此大的原因,是因為每個線程不再處理一個數據(而是處理一個從總體數據集里拆分下來的小數據集)。之前每一個數據,都需要調用一個線程來處理。現在我減少了調用線程的次數,從而提升了性能。
整理
上面的代碼要執行并發還有一些地方需要修改,所以我整理了代碼并放到工具類中,使其更具有通用性。
/**
- 將數據集拆分成子集并指派給規定數量的線程,并傳入回調來進行具體業務邏輯處理。
<b>T</b> 是要被處理的數據類型,<b>U</b> 是返回的數據類型
*/
public static <T, U> Iterable<U> parseDataInParallel(List<T> data, Func1<List<T>, U> worker) {
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(threadPoolExecutor);
AtomicInteger groupIndex = new AtomicInteger();
return Observable.from(data).groupBy(k -> groupIndex.getAndIncrement() % threadCount)
.flatMap(group -> group.observeOn(scheduler).toList().map(worker)).toBlocking().toIterable();
}
//EXAMPLE USAGE
Iterable<List<DataModel>> resultGroups = Util.parseDataInParallel(dataList,
(sublist) -> {
List<DataModel> dataModels = new ArrayList<>();
for (String data : sublist) {
dataModels.add(DataParser.createData(data));
}
return dataModels;
});
List<DataModel> results = new ArrayList<>();
for (List<DataModel> dataModels : resultGroups) {
results.addAll(dataModels);
}</code></pre>
這里 T 是被處理的數據類型,樣例中是 DataModel 。傳入待處理的 List<T> 并期望結果是 U 。在我的樣例中 U 是 List<DataModel> ,但它可以是任何東西,并不一定是一個 list。傳入的回調函數負責數據子列表具體的業務處理并返回結果。
可以再快點么?
事實上影響運行速度的因素有許多。比如線程管理方式,線程數,設備等。大多數因素我無法控制,但總有一些是我沒有考慮到的。
如果每個數據大小不相等會怎么樣?舉個例子,如果有 4 個線程,每個被指派給第 4 線程的數據大小是被指派給其他線程的十倍會怎么樣?這時第四個線程的耗時就是其他線程的大約 10 倍。這種情況下使用多線程就不會減少多少時間。我的第二次嘗試基本解決了這個問題,因為線程只在需要時才初始化。但這個方法太慢了。
我也試過改變數據分組方式。作為隨意分配的取代,我可以跟蹤每一組數據的總量,然后將數據分配給最少的那組。這樣每個線程的工作量就接近平均了。倒霉的是,測試之后發現這樣做增加的時間遠大于它節省的時間。
數據被分配的大小越平均,處理速度就越快。但大多數情況下,隨機分配看起來更快些。理想情況下是每個線程一有空就分配任務,同時執行分配所消耗的資源也少,這是最高效的。但我找不到一個足夠高效的可以減少分配瓶頸的方法。
總結
所以如果你想用多線程,這是我的建議。如果你有什么好想法,請務必告訴我。得到一個最優解(如果有的話)總是很難的。以及, 能 用多線程并不意味著 必須 用多線程。
來自:https://juejin.im/post/58ff6259da2f60005dd81459