Python并發編程之協程/異步IO
引言
隨著node.js的盛行,相信大家今年多多少少都聽到了異步編程這個概念。Python社區雖然對于異步編程的支持相比其他語言稍顯遲緩,但是也在Python3.4中加入了 asyncio ,在Python3.5上又提供了async/await語法層面的支持,剛正式發布的 Python3.6 中asynico也已經由臨時版改為了穩定版。下面我們就基于 Python3.4+ 來了解一下異步編程的概念以及asyncio的用法。
什么是協程
通常在Python中我們進行并發編程一般都是使用多線程或者多進程來實現的,對于計算型任務由于GIL的存在我們通常使用多進程來實現,而對與IO型任務我們可以通過線程調度來讓線程在執行IO任務時讓出GIL,從而實現表面上的并發。
其實對于IO型任務我們還有一種選擇就是協程, 協程是運行在單線程當中的“并發” ,協程相比多線程一大優勢就是省去了多線程之間的切換開銷,獲得了更大的運行效率。Python中的asyncio也是基于協程來進行實現的。在進入asyncio之前我們先來了解一下Python中怎么通過生成器進行協程來實現并發。
example1
我們先來看一個簡單的例子來了解一下什么是協程(coroutine)。
>>> def coroutine():
... reply = yield 'hello'
... yield reply
...
>>> c = coroutine()
>>> next(c)
'hello'
>>> c.send('world')
'world'
example2
下面這個程序我們要實現的功能就是 模擬多個學生同時向一個老師提交作業 ,按照傳統的話我們或許要采用多線程/多進程,但是這里我們可以采用生成器來實現協程用來模擬并發。
如果下面這個程序讀起來有點困難,可以直接跳到后面部分,并不影響閱讀,等你理解協程的本質,回過頭來看就很簡單了。
from collections import deque
def student(name, homeworks):
for homeworkin homeworks.items():
yield (name, homework[0], homework[1]) # 學生"生成"作業給老師
class Teacher(object):
def __init__(self, students):
self.students = deque(students)
def handle(self):
"""老師處理學生作業"""
while len(self.students):
student = self.students.pop()
try:
homework = next(student)
print('handling', homework[0], homework[1], homework[2])
except StopIteration:
pass
else:
self.students.appendleft(student)
下面我們來調用一下這個程序。
Teacher([
student('Student1', {'math': '1+1=2', 'cs': 'operating system'}),
student('Student2', {'math': '2+2=4', 'cs': 'computer graphics'}),
student('Student3', {'math': '3+3=5', 'cs': 'compiler construction'})
]).handle()
這是輸出結果,我們僅僅只用了一個簡單的生成器就實現了并發(concurrence),注意不是并行(parallel),因為我們的程序僅僅是運行在一個單線程當中。
handlingStudent3cscompiler construction
handlingStudent2cscomputergraphics
handlingStudent1csoperatingsystem
handlingStudent3math 3+3=5
handlingStudent2math 2+2=4
handlingStudent1math 1+1=2
##使用asyncio模塊實現協程
從Python3.4開始asyncio模塊加入到了標準庫,通過asyncio我們可以輕松實現協程來完成異步IO操作。
解釋一下下面這段代碼,我們創造了一個協程 display_date(num, loop) ,然后它使用關鍵字 yield from 來等待協程 asyncio.sleep(2) 的返回結果。而在這等待的2s之間它會讓出CPU的執行權,直到asyncio.sleep(2)返回結果。
# coroutine.py
import asyncio
import datetime
@asyncio.coroutine # 聲明一個協程
def display_date(num, loop):
end_time = loop.time() + 10.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(2) # 阻塞直到協程sleep(2)返回結果
loop = asyncio.get_event_loop() # 獲取一個event_loop
tasks = [display_date(1, loop), display_date(2, loop)]
loop.run_until_complete(asyncio.gather(*tasks)) # "阻塞"直到所有的tasks完成
loop.close()
下面是運行結果,注意到并發的效果沒有,程序從開始到結束只用大約10s,而在這里我們并沒有使用任何的多線程/多進程代碼。在實際項目中你可以將asyncio.sleep(secends)替換成相應的IO任務,比如數據庫/磁盤文件讀寫等操作。
ziwenxie :: ~ ? pythoncoroutine.py
Loop: 1 Time: 2016-12-19 16:06:46.515329
Loop: 2 Time: 2016-12-19 16:06:46.515446
Loop: 1 Time: 2016-12-19 16:06:48.517613
Loop: 2 Time: 2016-12-19 16:06:48.517724
Loop: 1 Time: 2016-12-19 16:06:50.520005
Loop: 2 Time: 2016-12-19 16:06:50.520169
Loop: 1 Time: 2016-12-19 16:06:52.522452
Loop: 2 Time: 2016-12-19 16:06:52.522567
Loop: 1 Time: 2016-12-19 16:06:54.524889
Loop: 2 Time: 2016-12-19 16:06:54.525031
Loop: 1 Time: 2016-12-19 16:06:56.527713
Loop: 2 Time: 2016-12-19 16:06:56.528102
在Python3.5中為我們提供更直接的對協程的支持,引入了async/await關鍵字,上面的代碼我們可以這樣改寫,使用async代替了@asyncio.coroutine,使用了await代替了yield from,這樣我們的代碼變得更加簡潔可讀。
import asyncio
import datetime
asyncdef display_date(num, loop): # 聲明一個協程
end_time = loop.time() + 10.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
awaitasyncio.sleep(2) # 等同于yield from
loop = asyncio.get_event_loop() # 獲取一個event_loop
tasks = [display_date(1, loop), display_date(2, loop)]
loop.run_until_complete(asyncio.gather(*tasks)) # "阻塞"直到所有的tasks完成
loop.close()
asyncio模塊詳解
開啟事件循環有兩種方法,一種方法就是通過調用 run_until_complete ,另外一種就是調用 run_forever 。run_until_complete內置add_done_callback,使用run_forever的好處是可以通過自己自定義add_done_callback,具體差異請看下面兩個例子。
run_until_complete()
import asyncio
asyncdef slow_operation(future):
awaitasyncio.sleep(1)
future.set_result('Future is done!')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
print(loop.is_running()) # False
loop.run_until_complete(future)
print(future.result())
loop.close()
run_forever()
run_forever相比run_until_complete的優勢是添加了一個add_done_callback,可以讓我們在task(future)完成的時候調用相應的方法進行后續處理。
import asyncio
asyncdef slow_operation(future):
awaitasyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop()
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()
這里還要注意一點,即使你調用了協程方法,但是如果事件循環沒有開啟,協程也不會執行,參考官方文檔的描述,我剛被坑過。
Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method. Coroutines (and tasks) can only run when the event loop is running.
Call
call_soon()
import asyncio
def hello_world(loop):
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
下面是運行結果,我們可以通過call_soon提前注冊我們的task,并且也可以根據返回的 Handle 進行cancel。
HelloWorld
call_later()
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.get_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
改動一下上面的例子我們來看一下call_later的用法,注意這里并沒有像上面那樣使用while循環進行操作,我們可以通過call_later來設置每隔1秒去調用display_date()方法。
2016-12-24 19:17:13.421649
2016-12-24 19:17:14.422933
2016-12-24 19:17:15.424315
2016-12-24 19:17:16.425571
2016-12-24 19:17:17.426874
Chain coroutines
import asyncio
asyncdef compute(x, y):
print("Compute %s + %s ..." % (x, y))
awaitasyncio.sleep(1.0) # 協程compute不會繼續往下面執行,直到協程sleep返回結果
return x + y
asyncdef print_sum(x, y):
result = awaitcompute(x, y) # 協程print_sum不會繼續往下執行,直到協程compute返回結果
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
下面是輸出結果
ziwenxie :: ~ ? pythonchain.py
Compute 1 + 2 ...
1 + 2 = 3
在爬蟲中使用asyncio來實現異步IO
下面我們來通過一個簡單的例子來看一下怎么在Python爬蟲項目中使用asyncio。by the way: 根據我有限的實驗結果,如果要充分發揮asynio的威力,應該使用 aiohttp 而不是requests。而且也要 合理使用 concurrent.futures 模塊提供的線程池/進程池,這一點我會在下一篇博文描述。
import asyncio
import requests
asyncdef spider(loop):
# run_in_exectuor會返回一個Future,而不是coroutine object
future1 = loop.run_in_executor(None, requests.get, 'https://www.python.org/')
future2 = loop.run_in_executor(None, requests.get, 'http://httpbin.org/')
# 通過命令行可以發現上面兩個網絡IO在并發進行
response1 = awaitfuture1 # 阻塞直到future1完成
response2 = awaitfuture2 # 阻塞直到future2完成
print(len(response1.text))
print(len(response2.text))
return 'done'
loop = asyncio.get_event_loop()
# If the argument is a coroutine object, it is wrapped by ensure_future().
result = loop.run_until_complete(spider(loop))
print(result)
loop.close()
p.s: 如果你能自己體會到為什么盲目地使用線程池/進程池并不能提高基于asynico模塊的程序的效率,我想你對協程的理解也差不多了。
References
來自:http://python.jobbole.com/87202/