Python 線程與協程
線程
要說到線程(Thread)與協程(Coroutine)似乎總是需要從并行(Parallelism)與并發(Concurrency)談起,關于并行與并發的問題, Rob Pike 用 Golang 小地鼠燒書的例子 給出了非常生動形象的說明。簡單來說并行就是我們現實世界運行的樣子,每個人都是獨立的執行單元,各自完成自己的任務,這對應著計算機中的分布式(多臺計算機)或多核(多個CPU)運作模式;而對于并發,我看到最生動的解釋來自 Quora 上 Jan Christian Meyer 回答的這張圖 :
并發對應計算機中充分利用單核(一個CPU)實現(看起來)多個任務同時執行。我們在這里將要討論的 Python 中的線程與協程僅是基于單核的并發實現,隨便去網上搜一搜(Thread vs Coroutine)可以找到一大批關于它們性能的爭論、benchmark,這次話題的目的不在于討論誰好誰壞,套用一句非常套路的話來說,拋開應用場景爭好壞都是耍流氓。當然在硬件支持的條件下(多核)也可以利用線程和協程實現并行計算,而且 Python 2.6 之后新增了標準庫 multiprocessing ( PEP 371 )突破了 GIL 的限制可以充分利用多核,但由于協程是基于單個線程的,因此多進程的并行對它們來說情況是類似的,因此這里只討論單核并發的實現。
要了解線程以及協程的原理和由來可以查看參考鏈接中的前兩篇文章。Python 3.5 中關于線程的標準庫是 threading ,之前在 2.x 版本中的 thread 在 3.x 之后更名為 _thread ,無論是2.7還是3.5都應該盡量避免使用較為底層的 thread/_thread 而應該使用 threading 。
創建一個線程可以通過實例化一個 threading.Thread 對象:
from threading import Thread
import time
def _sum(x, y):
print("Compute {} + {}...".format(x, y))
time.sleep(2.0)
return x+y
def compute_sum(x, y):
result = _sum(x, y)
print("{} + {} = {}".format(x, y, result))
start = time.time()
threads = [
Thread(target=compute_sum, args=(0,0)),
Thread(target=compute_sum, args=(1,1)),
Thread(target=compute_sum, args=(2,2)),
]
for t in threads:
t.start()
for t in threads:
t.join()
print("Total elapsed time {} s".format(time.time() - start))
Do not use Thread
start = time.time()
compute_sum(0,0)
compute_sum(1,1)
compute_sum(2,2)
print("Total elapsed time {} s".format(time.time() - start))
</code></pre>
Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 = 0
1 + 1 = 2
2 + 2 = 4
Total elapsed time 2.002729892730713 s
Compute 0 + 0...
0 + 0 = 0
Compute 1 + 1...
1 + 1 = 2
Compute 2 + 2...
2 + 2 = 4
Total elapsed time 6.004806041717529 s
除了通過將函數傳遞給 Thread 創建線程實例之外,還可以直接繼承 Thread 類:
from threading import Thread
import time
class ComputeSum(Thread):
def __init__(self, x, y):
super().__init__()
self.x = x
self.y = y
def run(self):
result = self._sum(self.x, self.y)
print("{} + {} = {}".format(self.x, self.y, result))
def _sum(self, x, y):
print("Compute {} + {}...".format(x, y))
time.sleep(2.0)
return x+y
threads = [ComputeSum(0,0), ComputeSum(1,1), ComputeSum(2,2)]
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
print("Total elapsed time {} s".format(time.time() - start))
Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 = 0
1 + 1 = 2
2 + 2 = 4
Total elapsed time 2.001662015914917 s
根據上面代碼執行的結果可以發現, compute_sum/t.run 函數的執行是按照 start() 的順序,但 _sum 結果的輸出順序卻是隨機的。因為 _sum 中加入了 time.sleep(2.0) ,讓程序執行到這里就會進入阻塞狀態,但是幾個線程的執行看起來卻像是同時進行的(并發)。
有時候我們既需要并發地“跳過“阻塞的部分,又需要有序地執行其它部分,例如操作共享數據的時候,這時就需要用到”鎖“。在上述”求和線程“的例子中,假設每次求和都需要加上額外的 _base 并把計算結果累積到 _base 中。盡管這個例子不太恰當,但它說明了線程鎖的用途:
from threading import Thread, Lock
import time
_base = 1
_lock = Lock()
class ComputeSum(Thread):
def init(self, x, y):
super().init()
self.x = x
self.y = y
def run(self):
result = self._sum(self.x, self.y)
print("{} + {} + base = {}".format(self.x, self.y, result))
def _sum(self, x, y):
print("Compute {} + {}...".format(x, y))
time.sleep(2.0)
global _base
with _lock:
result = x + y + _base
_base = result
return result
threads = [ComputeSum(0,0), ComputeSum(1,1), ComputeSum(2,2)]
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
print("Total elapsed time {} s".format(time.time() - start))
</code></pre>
Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 + base = 1
1 + 1 + base = 3
2 + 2 + base = 7
Total elapsed time 2.0064051151275635 s
這里用 上下文管理器 來管理鎖的獲取和釋放,相當于:
_lock.acquire()
try:
result = x + y + _base
_base = result
finally:
_lock.release()
死鎖
線程的一大問題就是通過加鎖來”搶奪“共享資源的時候有可能造成死鎖,例如下面的程序:
from threading import Lock
_base_lock = Lock()
_pos_lock = Lock()
_base = 1
def _sum(x, y):
# Time 1
with _base_lock:
# Time 3
with _pos_lock:
result = x + y
return result
def _minus(x, y):
# Time 0
with _pos_lock:
# Time 2
with _base_lock:
result = x - y
return result
</code></pre>
由于線程的調度執行順序是不確定的,在執行上面兩個線程 _sum/_minus 的時候就有可能出現注釋中所標注的時間順序,即 # Time 0 的時候運行到 with _pos_lock 獲取了 _pos_lock 鎖,而接下來由于阻塞馬上切換到了 _sum 中的 # Time 1 ,并獲取了 _base_lock ,接下來由于兩個線程互相鎖定了彼此需要的下一個鎖,將會導致死鎖,即程序無法繼續運行。根據 我是一個線程 中所描述的,為了避免死鎖,需要所有的線程按照指定的算法(或優先級)來進行加鎖操作。不管怎么說,死鎖問題都是一件非常傷腦筋的事,原因之一在于不管線程實現的是并發還是并行,在編程模型和語法上看起來都是并行的,而我們的大腦雖然是一個(內隱的)絕對并行加工的機器,卻非常不善于將并行過程具象化(至少在未經足夠訓練的時候)。而與線程相比,協程(尤其是結合事件循環)無論在編程模型還是語法上,看起來都是非常友好的單線程同步過程。后面第二部分我們再來討論 Python 中協程是如何從”小三“一步步扶正上位的 :D 。
協程
我之前翻譯了Python 3.5 協程原理這篇文章之后嘗試用了 Tornado + Motor 模式下的協程進行異步開發,確實感受到協程所帶來的好處(至少是語法上的 :D )。至于協程的 async/await 語法是如何由開始的 yield 生成器一步一步上位至 Python 的 async/await 組合語句,前面那篇翻譯的文章里面講得已經非常詳盡了。我們知道協程的本質上是:
allowing multiple entry points for suspending and resuming execution at certain locations.
允許多個入口對程序進行掛起、繼續執行等操作,我們首先想到的自然也是生成器:
def jump_range(upper):
index = 0
while index < upper:
jump = yield index
if jump is None:
jump = 1
index += jump
jump = jump_range(5)
print(jump)
print(jump.send(None))
print(jump.send(3))
print(jump.send(None))
<generator object jump_range at 0x10e283518>
0
3
4
后來又新增了 yield from 語法,可以將生成器串聯起來:
def wait_index(i):
# processing i...
return (yield i)
def jump_range(upper):
index = 0
while index < upper:
jump = yield from wait_index(index)
if jump is None:
jump = 1
index += jump
jump = jump_range(5)
print(jump)
print(jump.send(None))
print(jump.send(3))
print(jump.send(None))
</code></pre>
<generator object jump_range at 0x10e22a780>
0
3
4
yield from / send 似乎已經滿足了協程所定義的需求,最初也確實是用 @types.coroutine 修飾器 將生成器轉換成協程來使用,在 Python 3.5 之后則以專用的 async/await 取代了 @types.coroutine/yield from :
class Wait(object):
"""
由于 Coroutine 協議規定 await 后只能跟 awaitable 對象,
而 awaitable 對象必須是實現了 __await__ 方法且返回迭代器
或者也是一個協程對象,
因此這里臨時實現一個 awaitable 對象。
"""
def __init__(self, index):
self.index = index
def __await__(self):
return (yield self.index)
async def jump_range(upper):
index = 0
while index < upper:
jump = await Wait(index)
if jump is None:
jump = 1
index += jump
jump = jump_range(5)
print(jump)
print(jump.send(None))
print(jump.send(3))
print(jump.send(None))
<coroutine object jump_range at 0x10e2837d8>
0
3
4
與線程相比
協程的執行過程如下所示:
import asyncio
import time
import types
@types.coroutine
def _sum(x, y):
print("Compute {} + {}...".format(x, y))
yield time.sleep(2.0)
return x+y
@types.coroutine
def compute_sum(x, y):
result = yield from _sum(x, y)
print("{} + {} = {}".format(x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(compute_sum(0,0))
</code></pre>
Compute 0 + 0...
0 + 0 = 0

這張圖(來自: PyDocs: 18.5.3. Tasks and coroutines )清楚地描繪了由事件循環調度的協程的執行過程,上面的例子中事件循環的隊列里只有一個協程,如果要與上一部分中線程實現的并發的例子相比較,只要向事件循環的任務隊列中添加協程即可:
import asyncio
import time
上面的例子為了從生成器過度,下面全部改用 async/await 語法
async def _sum(x, y):
print("Compute {} + {}...".format(x, y))
await asyncio.sleep(2.0)
return x+y
async def compute_sum(x, y):
result = await _sum(x, y)
print("{} + {} = {}".format(x, y, result))
start = time.time()
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(compute_sum(0, 0)),
asyncio.ensure_future(compute_sum(1, 1)),
asyncio.ensure_future(compute_sum(2, 2)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print("Total elapsed time {}".format(time.time() - start))
</code></pre>
Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 = 0
1 + 1 = 2
2 + 2 = 4
Total elapsed time 2.0042951107025146
總結
這兩篇主要關于 Python 中的線程與協程的一些基本原理與用法,為此我搜索了不少參考文章與鏈接,對我自己理解它們的原理與應用場景也有很大的幫助(當然也有可能存在理解不到位的地方,歡迎指正)。當然在這里還是主要關注基于 Python 的語法與應用,如果想要了解更多底層實現的細節,可能需要從系統調度等底層技術細節開始學習(幾年前我記得翻閱過《深入理解LINUX內核》這本書,雖然大部分細節已經記不清楚了,但對于理解其它人的分析、總結還是有一定幫助的)。這里討論的基于協程的異步主要是借助于事件循環(由 asyncio 標準庫提供),包括上文中的示意圖,看起來很容易讓人聯想到 Node.js 的事件循環 & 回調,但是協程與回調也還是有區別的,具體就不在這里展開了,可以參考下面第一條參考鏈接。
參考
- Python 中的進程、線程、協程、同步、異步、回調
- 我是一個線程
- Concurrency is not Parallelism
- A Curious Course on Coroutines and Concurrency
- PyDocs: 17.1. threading — Thread-based parallelism
- PyDocs: 18.5.3. Tasks and coroutines
- [譯] Python 3.5 協程究竟是個啥
- 協程的好處是什么? - crazybie 的回答
- Py3-cookbook:第十二章:并發編程
- Quora: What are the differences between parallel, concurrent and asynchronous programming?
- Real-time apps with gevent-socketio
來自: http://blog.rainy.im/2016/04/07/python-thread-and-coroutine/