踏破鐵鞋無覓處,從 AsyncTask 學 Android 線程池
android對于主線程的響應時間限制的非常嚴格,稍有不慎就會遇到 Application Not Responding(ANR) 的彈框。用戶可以輕點手指關掉你的APP。 同時,保持應用隨時響應用戶的操作也是良好用戶體驗的前提。
線程的開始和結束
要做到以上多線程是必不可少的。課本會告訴你什么時候開辟一個線程,但是很少說的一個很重要的問題是結束。比如,我現在在Activity里有一個工作需要創建一個線程執行,但是這個Activity在進入后臺后不幸遇到系統回收資源被銷毀了。但是這個線程還在漫無目的的游走,耗費資源。
如何結束?先創建一個:
mThread = Thread(Runnable {
// do something here...
})
mThread?.start()
以上使用kotlin的lambda表達式簡寫了創建 Runnable 對象部分的代碼。主旨還是創建了一個 Runnable 對象,并將其作為參數傳入 Thread 。
如何讓一個 Thread 能夠退出呢?這就要在 Runnable 身上下功夫了。首先添加一個是否停止的標識 isCancelled ,一旦值為true則停止線程的運行,否則繼續。我們這里不討論 Thread#interrupt() 這個方法,這個方法詭異的地方太多。
首先要給 Runnable “添加一個屬性”作為上文的是否停止的標識。直接添加時不可能的, Runnable 只是一個 interface ,不是 class 。所以要實現這個借口為一個抽象類,這樣就可以添加屬性了。
abstract class CancelableRunnable() : Runnable {
var isCancelled: Boolean = false
}
這里使用抽象類,是因為 run() 方法的實現留給使用的時候給出。
var runnable = object : CancelableRunnable() {
override fun run() {
if (isCancelled) {
var msg = mHandler.obtainMessage(THREAD_CANCELLED)
mHandler.sendMessage(msg)
return
}
Thread.sleep(2000)
if (isCancelled) {
var msg = mHandler.obtainMessage(THREAD_CANCELLED)
mHandler.sendMessage(msg)
return
}
var msg = mHandler.obtainMessage(THREAD_FINISHED)
mHandler.sendMessage(msg)
}
}
Thread.sleep(2000) 用來模擬一個費時的任務。開始之前檢測是否取消了線程的執行,執行之后在檢測。之后的檢測是有的時候任務執行之后需要有持久化處理結果或者修改任務完成情況的標識之類的動作,如果已經取消了線程的執行,即使任務執行完成也不持久化結果、不修改完成情況。
最后都檢測完成之后如果沒有取消線程,則發出任務完成執行的消息。
發出和處理這些消息的 Handler 的定義:
var mHandler = object : Handler() {
override fun handleMessage(msg: Message?) {
when (msg?.what) {
THREAD_CANCELLED -> {
mResultTextView.text = "thread cancelled"
}
THREAD_FINISHED -> {
mResultTextView.text = "thread finished"
}
else -> {
mResultTextView.text = "what's going on"
}
}
}
}
運行在UI線程的 Handler 檢測從線程發出的消息,如果是 THREAD_CANCELLED 那就是線程已經取消了,如果是 THREAD_FINISHED 那就是線程完全運行結束。之后根據message的消息設置 TextView 的文本內容。
這里使用了兩個按鈕來啟動和停止線程:
findViewById(R.id.start_button)?.setOnClickListener { v ->
runnable.isCancelled = false
mThread = Thread(runnable)
mThread?.start()
mResultTextView.text = "Thread running..."
}
findViewById(R.id.stop_button)?.setOnClickListener { v ->
this.runnable.isCancelled = true
}
上面用到的 Runnable 是只做一件事的,如果是連續不斷的循環很多事的話也可以使用 white 語句來控制是否一直執行線程的工作。一旦設置為停止線程,則停止線程任務的循環跳出 Runnable#run() 方法,結束線程。
完整代碼放在附錄中。
所以,如果你在Activity里開辟了一個線程,在Activity被回收的時候結束線程就可以這么做:
override fun onDestroy() {
super.onDestroy()
this.runnable.isCancelled = true
}
這樣就再也不用擔心Activity掛了,線程還陰魂不散了。
AsyncTask
既然緣起 AsyncTask 那就肯定需要讀者一起了解一下相關的概念。
比起來使用 Handler + Thread + Runnable 的多線程異步執行模式來說,使用 AsyncTask 是簡單了非常的多的。
先簡單了解一下 AsyncTask 。
public abstract class AsyncTask<Params, Progress, Result>
AsyncTask 是一個抽象泛型類。三個類型 Params , Progress , Result 分別對應的是輸入參數的類型,精度更新使用的類型,最后是返回結果的類型。其中任何一個類型如果你不需要的話,可以使用 java.lang.Void 代替。
繼承 AsyncTask 給出自己的實現,最少需要實現 doInBackground 方法。 doInBackground 方法是在后臺線程中運行的。如果要在任務執行之后更新UI線程的話還至少需要給出 onPostExecute 方法的實現,在這個方法中才可以更新UI。
上述的兩個方法已經構成了一個 AsyncTask 使用的基本單元。在后臺線程處理一些任務,并在處理完成之后更新UI。但是如果一個任務比較長,只是在最后更新UI是不夠的,還需要不斷的提示用戶已經完成的進度是多少。這就是需要另外實現 onProgressUpdate 方法。并在 doInBackground 方法中調用 publishProgress 方法發出每個任務的處理進度。
這個 AsyncTask 總體上就是這樣的了:
inner class DemoAsyncTask() : AsyncTask<String, Int, String>() {
// var isRunning = true
override fun doInBackground(vararg params: String?): String? {
Log.i(TAG, "##AsyncTask doing something...")
var i = 0
val TOTAL = 100000000
var progress = 0
while (i < TOTAL) {
Log.d(TAG, "doning jobs $i is cancelled $isCancelled")
i++
var currentProgress = i.toFloat() / TOTAL
if (currentProgress > progress && Math.abs(currentProgress - progress) > 0.1) {
progress = currentProgress
publishProgress((progress * 100).toInt())
}
}
}
Log.d(TAG, "doing jobs $i is cancelled $isCancelled")
return "Task done"
}
override fun onPostExecute(result: String?) {
this@CancalableActivity.mAsyncTextView?.text = result
}
override fun onProgressUpdate(vararg values: Int?) {
mAsyncTextView?.text = "${mAsyncTextView?.text ?: "Async task..."} progress: ${values?.get(0) ?: 0}"
}
}
到這里各位讀者應該對 AsyncTask 已經有一個總體的認識了。后臺任務在 doInBackground 處理,處理過程的百分比使用 publishProgress 方法通知,并在 onProgressUpdate 方法中更新UI的百分比。最后任務處理全部完成之后在 onPostExecute 更新UI,顯示全部完成。
怎么取消一個任務的執行呢?這個機本身還上面的線程的取消基本上一樣。只是 AsyncTask 已經提供了足夠的屬性和方法完成取消的工作。直接調用 AsyncTask#cancel 方法就可以發出取消的信號,但是是否可以取消還要看這個方法的返回值是什么。如果是 true 那就是可以,否則任務不可取消(但是不可取消的原因很可能是任務已經執行完了)。
調用 cancel 方法發出取消信號,并且可以取消的時候。 isCancelled() 就會返回 true 。同時 onPostExecute 這個方法就不會再被調用了。而是 onCancelled(object) 方法被調用。同樣是在 doInBackground 這個方法執行完之后調用。所以,如果想要在取消任務執行后盡快的調用到 onCancelled(object) 的話,就需要在 onInBackground 的時候不斷的檢查 isCancelled() 是否返回 true 。如果返回的是 true 就跳出方法的執行。
inner class DemoAsyncTask() : AsyncTask<String, Int, String>() {
// var isRunning = true
override fun doInBackground(vararg params: String?): String? {
Log.i(TAG, "##AsyncTask doing something...")
var i = 0
val TOTAL = 1000000
var progress = 0.0f
while (i < TOTAL && !isCancelled) {
Log.d(TAG, "doning jobs $i is cancelled $isCancelled")
i++
var currentProgress = i.toFloat() / TOTAL
if (currentProgress > progress && Math.abs(currentProgress - progress) > 0.1) {
progress = currentProgress
publishProgress((progress * 100).toInt())
}
}
Log.d(TAG, "doning jobs $i is cancelled $isCancelled")
return "Task done"
}
override fun onPostExecute(result: String?) {
this@CancalableActivity.mAsyncTextView?.text = result
}
override fun onProgressUpdate(vararg values: Int?) {
mAsyncTextView?.text = "${mAsyncTextView?.text ?: "Async task..."} progress: ${values?.get(0) ?: 0}"
}
override fun onCancelled() {
Log.i(TAG, "##Task cancelled")
// isRunning = false
this@CancalableActivity.mAsyncTextView?.text = "###Task cancelled"
}
// override fun onCancelled(result: String?) {
// Log.i(TAG, "##Task cancelled")
//// isRunning = false
// this@CancalableActivity.mAsyncTextView?.text = result ?: "Task cancelled"
// }
}
onCancelled() 是API level 3的時候加入的。 onCancelled(Result result) 是API level 11的時候加入的。這個在兼容低版本的時候需要注意。
但是一點需要格外注意:
AsyncTask一定要在UI線程初始化。不過在**JELLY_BEAN**以后這個問題也解決了。
總之,在UI線程初始化你的`AsyncTask`肯定是不會錯的。
線程池
下面就來看看線程池的概念。顧名思義,線程池就是放線程的池子。把費時費力,或者影響響應用戶操作的代碼放在另外一個線程執行時常有的事。但是如果無顧忌的開辟線程,卻會適得其反,嚴重的浪費系統資源。于是就有了線程池。線程池就是通過 某些機制讓線程不要創建那么多,能復用就復用,實在不行就讓任務排隊等一等 。
這個機制在線程池的構造函數里體現的非常明顯:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize線程池里閑著也不回收的線程數量。除非 allowCoreThreadTimeOut 指定可以回收。
-
maximumPoolSize線程池允許的最大線程數。
-
keepAliveTime非核心線程(就是如果核心線程數量 corePoolSize 定義為1的話,第二個就是非核心線程)的超時時間。
-
unit keepAliveTime 的時間單位,毫秒,秒等。
-
workQueue存放 execute(Runnable cmd) 方法提交的 Runnable 任務。
-
threadFactory線程池用來創建新線程用的一個工廠類。
-
handler線程池達到最大線程數,并且任務隊列也已經滿的時候會拒絕 execute(Runnable cmd) 方法提交任務。這個時候調用這個handler。
知道以上基本內容以后,就可以探討線程池管理線程的機制了。概括起來有三點:
-
如果線程池的線程數量少于 corePoolSize 的時候,線程池會使用 threadFactory 這個線程工廠創建新的線程執行 Runnable 任務。
-
如果線程池的線程數量大于 corePoolSize 的時候,線程池會把 Runnable 任務存放在隊列 workQueue 中。
-
線程池的線程數量大于 corePoolSize ,隊列 workQueue 已滿,而且小于 maximumPoolSize 的時候,線程池會創建新的線程執行 Runnable 任務。否則,任務被拒。
現在回到 AsyncTask 。被人廣為詬病的 AsyncTask 是他的任務都是順序執行的。 一個AsyncTask的實例只能處理一個任務 。但是在 AsyncTask 后面處理任務的是一個靜態的線程池。在看這個線程池 SerialExecutor 的 execute 方法實現:
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
// 執行一個task
}
}
這個線程池 SerialExecutor 在處理 Runnable 的傳入參數的時候對這個任務進行了重新包裝成了一個新的 Runnable 對象,并且將這個新的對象存入了一個叫做 mTasks 的隊列。這個新的 Runnable 對象首先執行傳入的任務,之后不管有無異常調用 scheduleNext 方法執行下一個。于是整體的就生成了一個傳入的任務都順序執行的邏輯。
這個線性執行的靜態線程池 SerialExecutor 的實現非常簡單。并不涉及到我們前文所說的那么多復雜的內容。在實現上,這個線程池只實現了線程池的最頂層接口 Executor 。這個接口只有一個方法就是 execute(Runnable r) 。另外需要強調一點: mTasks 的類型 ArrayDeque<T> 是一個不受大小限制的隊列。可以存放任意多的任務。在線程池的討論中遇到隊列就需要看看容量概念。
SerialExecutor 只是進行了簡單的隊列排列。但是在 scheduleNext 方法的實現上又會用到一個復雜一些的線程池來執行任務的具體執行。這線程池叫做
THREAD_POOL_EXECUTOR 。我們來具體看看其實現:
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
這個線程池的實現非常具有現實價值。雖然稍后介紹的系統提供的幾種線程池的實現就夠用。但是難免遇到一些需要自定義線程池的情況。詳細解析如下:
-
CORE_POOL_SIZE線程池的核心線程數量為設備核心數加一。
-
MAXIMUM_POOL_SIZE線程池的最大線程數量為核心數的兩倍加一。
-
KEEP_ALIVE線程池中非核心線程的超時時間為一秒。
-
sPoolWorkQueue線程池存放任務的隊列。最大個數為 128 個。參考上面說的線程池處理機制,會出現任務被拒的情況。排隊的線程池 SerialExecutor 存放任務的隊列是可以認為無限長的,但是 THREAD_POOL_EXECUTOR 的隊列最多存放128個任務,加上線程池核心線程的數量,能處理的任務相對有限。出現任務被拒的情況的幾率比較大。所以,往 AsyncTask 里直接添加 Runnable 對象的時候需要三思。
-
sThreadFactory線程池用來創建線程的工廠對象。 ThreadFactory 是一個只有一個方法 Thread newThread(Runnable r); 的接口。這里在實現的時候給新創建的線程添加了一個原子計數,并把這個計數作為線程名稱傳遞給了線程的構造函數。
到這里,我們就已經很清楚 AsyncTask 是如何用一個極其簡單的線程池 SerialExecutor 給任務排隊的。又是如何使用一個復雜一些的線程池 THREAD_POOL_EXECUTOR 來處理具體的任務執行的。尤其是線程池 THREAD_POOL_EXECUTOR ,在我們實際應用一個自定義的線程池的時候在設定線程池核心線程數量,線程池最大線程數量的時候都依據什么?明顯就是設備的CPU核心數。線程分別在不同個CPU核心中做并行的處理。核心數多可以同時處理的線程數就相對較多,相反則會比較少一些。如此設置核心線程數量就會平衡并行處理的任務數量和在處理的過程中耗費的系統資源。
為了讓開發者省時省力,系統默認的提供了四種可以適應不同應用條件的線程池:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
-
newFixedThreadPool顧名思義,線程數量固定的線程池,且其數量等于參數指定值。這一類型的線程池的核心線程數量和最大線程數量是一樣的。存放任務的隊列的容量可以被認為無限大。一旦線程池創建的線程數量等 nThreads 參數值的時候,新增的任務將會被存放在任務隊列中等待核心線程可用的時候執行。
-
newSingleThreadExecutor newFixedThreadPool 的一個特殊情況,當 mThreads 值為1的時候。
-
newCachedThreadPool這一類型的線程池中創建的線程都有60秒的超時時間,由于超時時間比較長等于是線程空閑了以后被緩存了60秒。由于核心線程數量為0,所以創建的線程都是非核心線程。也因此超時時間才管用。任務隊列 SynchronousQueue 非常特殊,簡單理解就是一個任務都存放不了。而線程池的最大線程數量又設定為 Integer.MAX_VALUE ,可以認為是無限大。根據線程池處理任務的機制,可以認為有新任務過來就會創建一個線程去處理這個任務,但是如果存在空閑沒有超時的線程會優先使用。
-
newScheduledThreadPool生成一個 ScheduledThreadPoolExecutor 實例。可以通過其提供的接口方法設定延遲一定的時間執行或者隔一定的時間周期執行。
來一個例子:
import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
public void beepForAnHour() {
final Runnable beeper = new Runnable() {
public void run() { System.out.println("beep");
};
final ScheduledFuture beeperHandle =
scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
scheduler.schedule(new Runnable() {
public void run() { beeperHandle.cancel(true); }
}, 60 * 60, SECONDS);
}
}}
附錄
這里是上面例子中使用的全部代碼。
線程的停止:
package demo.retrofit2rxjavademo.Activities
import android.os.Bundle
import android.os.Handler
import android.os.Message
import android.support.v7.app.AppCompatActivity
import android.widget.TextView
import demo.retrofit2rxjavademo.R
class CancalableActivity : AppCompatActivity() {
lateinit var mResultTextView: TextView
var mHandler = object : Handler() {
override fun handleMessage(msg: Message?) {
when (msg?.what) {
THREAD_CANCELLED -> {
mResultTextView.text = "thread cancelled"
}
THREAD_FINISHED -> {
mResultTextView.text = "thread finished"
}
else -> {
mResultTextView.text = "what's going on"
}
}
}
}
var mThread: Thread? = null
var runnable = object : CancelableRunnable() {
override fun run() {
if (isCancelled) {
var msg = mHandler.obtainMessage(THREAD_CANCELLED)
mHandler.sendMessage(msg)
return
}
Thread.sleep(2000)
if (isCancelled) {
var msg = mHandler.obtainMessage(THREAD_CANCELLED)
mHandler.sendMessage(msg)
return
}
var msg = mHandler.obtainMessage(THREAD_FINISHED)
mHandler.sendMessage(msg)
}
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_cancalable)
mResultTextView = findViewById(R.id.run_result_text_view) as TextView
findViewById(R.id.start_button)?.setOnClickListener { v ->
runnable.isCancelled = false
mThread = Thread(runnable)
mThread?.start()
mResultTextView.text = "Thread running..."
}
findViewById(R.id.stop_button)?.setOnClickListener { v ->
this.runnable.isCancelled = true
}
}
abstract class CancelableRunnable() : Runnable {
var isCancelled: Boolean = false
}
companion object {
val THREAD_FINISHED = 0
val THREAD_CANCELLED = 1
}
}
來自:https://segmentfault.com/a/1190000006880107