從0到1,Python異步編程的演進之路

本文將通過一些例子來講述作為Python開發者有哪些常用的方式來實現異步編程,以及分享個人對異步編程的理解,如有錯誤,歡迎指正。
先從一個例子說起。
小梁是一個忠實的電影好愛者,有一天,小梁看到豆瓣這個網站,發現了很多自己喜歡的內容,恰好小梁是個程序猿,于是心血來潮的他決定寫個程序,把豆瓣Top250的電影列表給爬下來。小梁平時是個Python發燒友,做起這些事情來自然是得心應手,于是他欣喜地擼起袖子就是干!果不其然,不到十分鐘,小梁就寫好了第一個程序。
#-*- coding:utf-8 -*-
import urllib.request
import ssl
from lxml import etree
url = 'https://movie.douban.com/top250'
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
def fetch_page(url):
response = urllib.request.urlopen(url, context=context)
return response
def parse(url):
response = fetch_page(url)
page = response.read()
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
for url in fetch_list:
response = fetch_page(url)
page = response.read()
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
print(i, title)
def main():
parse(url)
if __name__ == '__main__':
main()
程序也不出意外地正常運行。

但是,這個程序讓人感覺比較慢,有多慢呢?小梁在主函數中加了下面一段代碼。
def main():
from time import time
start = time()
for i in range(5):
parse(url)
end = time()
print ('Cost {} seconds'.format((end - start) / 5))
發現總共耗時7.6秒!!
python movie.py
Cost 7.619797945022583 seconds
小梁不禁陷入了沉思...

小梁突然想起了兩天前小張同學給他安利的一個庫,叫 requests ,比那urllib,urllib2,urllib3,urllibn...不知高到哪里去了!小梁興致勃勃地修改程序,用requests代替了標準庫urllib。
import requests
from lxml import etree
from time import time
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def parse(url):
response = fetch_page(url)
page = response.content
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
for url in fetch_list:
response = fetch_page(url)
page = response.content
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
結果一測,6.5秒!雖然比用urllib快了1秒多,但是總體來說,他們基本還是處于同一水平線的,程序并沒有快很多,這一點的差距或許是requests對請求做了優化導致的。
python movie_requests.py
Cost 6.540304231643677 seconds
小梁不禁暗想:是我的程序寫的太挫了嗎?會不會是lxml這個庫解析的速度太慢了,用正則表達式會不會好一些?
于是小梁把lxml庫換成了標準的re庫。
#-*- coding:utf-8 -*-
import requests
from time import time
import re
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def parse(url):
response = fetch_page(url)
page = response.content
fetch_list = set()
result = []
for title in re.findall(rb'<a href=.*\s.*<span class="title">(.*)</span>', page):
result.append(title)
for postfix in re.findall(rb'<a href="(\?start=.*?)"', page):
fetch_list.add(url + postfix.decode())
for url in fetch_list:
response = fetch_page(url)
page = response.content
for title in re.findall(rb'<a href=.*\s.*<span class="title">(.*)</span>', page):
result.append(title)
for i, title in enumerate(result, 1):
title = title.decode()
# print(i, title)
再一跑,咦,又足足提升了將近一秒!
python movie_regex.py
Cost 5.578997182846069 seconds
小梁心里暗爽,程序變得更短了,運行得也更快了,感覺離成功越來越近了,但小梁眉頭一皺,很快地意識到了一個問題,這樣寫出來的程序雖然看起來更短了,但所做的都是在盲目地求 快 ,但完全沒有 擴展性 可言!雖然這樣做可以滿足普通的需求場景,但當程序邏輯變復雜時,依賴原生正則表達式的程序會更加難以維護!借助一些專門做這些事情的解析庫,才能使程序變得清晰。其次,這種網絡應用通常瓶頸都在IO層面,解決等待讀寫的問題比提高文本解析速度來的更有性價比!小梁想起了昨天上操作系統課時老師講的多進程和多線程概念,正好用他們來解決實際問題。

#-*- coding:utf-8 -*-
import requests
from lxml import etree
from time import time
from threading import Thread
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def parse(url):
response = fetch_page(url)
page = response.content
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
def fetch_content(url):
response = fetch_page(url)
page = response.content
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
threads = []
for url in fetch_list:
t = Thread(target=fetch_content, args=[url])
t.start()
threads.append(t)
for t in threads:
t.join()
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
效果果然立竿見影!多線程有效的解決了阻塞等待的問題,這個程序足足比之前的程序快了80%!只需要1.4秒就可完成電影列表的抓取。
python movie_multithread.py
Cost 1.451986598968506 seconds
但小梁還是覺得不夠過癮,既然Python的多線程也受制于GIL,為什么我不用多進程呢?于是話不多說又擼出了一個基于多進程的版本。用4個進程的進程池來并行處理網絡數據。
#-*- coding:utf-8 -*-
import requests
from lxml import etree
from time import time
from concurrent.futures import ProcessPoolExecutor
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def fetch_content(url):
response = fetch_page(url)
page = response.content
return page
def parse(url):
page = fetch_content(url)
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
with ProcessPoolExecutor(max_workers=4) as executor:
for page in executor.map(fetch_content, fetch_list):
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
結果是2秒,甚至還不如多線程的版本。
python movie_multiprocess.py
Cost 2.029435634613037 seconds
(注:ThreadPoolExecutor和ProcessPoolExecutor是Python3.2之后引入的分別對線程池和進程池的一個封裝,如果使用Python2.x,需要安裝 futures 這個庫才能使用它們。)
小梁立馬就傻眼了,這跟他的預期完全不符合啊。

多進程帶來的優點(cpu處理)并沒有得到體現, 反而創建和調度進程帶來的開銷要遠超出它的正面效應 ,拖了一把后腿。即便如此,多進程帶來的效益相比于之前單進程單線程的模型要好得多。

正當小梁在苦苦思索還有什么方法可以提高性能時,他無意中看到一篇文章,里面提到了協程相比于多進程和多線程的優點( 多進程和多線程除了創建的開銷大之外還有一個難以根治的缺陷,就是處理進程之間或線程之間的協作問題,因為是依賴多進程和多線程的程序在不加鎖的情況下通常是不可控的,而協程則可以完美地解決協作問題,由用戶來決定協程之間的調度。 ),小梁折騰起來也是不甘人后啊,他搜索了一些資料,思考如何用協程來加強自己的程序。
很快,小梁就發現了一個基于協程的網絡庫,叫做gevent,而且更爽的是,聽說用了gevent的猴子補丁后,整個程序就會變成異步的了!

真的有那么神奇嗎?小梁迫不及待地要看看這到底是什么黑科技!馬上寫出了基于gevent的栗子:
#-*- coding:utf-8 -*-
import requests
from lxml import etree
from time import time
import gevent
from gevent import monkey
monkey.patch_all()
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def fetch_content(url):
response = fetch_page(url)
page = response.content
return page
def parse(url):
page = fetch_content(url)
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
jobs = [gevent.spawn(fetch_content, url) for url in fetch_list]
gevent.joinall(jobs)
[job.value for job in jobs]
for page in [job.value for job in jobs]:
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
只有1.2秒,果然很快!而且我們看整個程序,幾乎看不到有異步處理的影子, gevent給予了我們一種以同步邏輯來書寫異步程序的能力
,看monkey.patch_all()這段代碼,它是整個程序實現異步的黑科技,當我們給程序打了猴子補丁后,Python程序在運行時會動態地將一些網絡庫(例如socket,thread)替換掉,變成異步的庫。使得程序在進行網絡操作的時候都變成異步的方式去工作,效率就自然提升很多了。
python movie_gevent.py
Cost 1.2647549629211425 seconds
雖然程序變得很快了,但小梁整個人都是懵逼的啊,gevent的魔術給他帶來了一定的困惑,而且他覺得gevent這玩意實在不好學,跟他心目中Pythonic的清晰優雅還是有距離的。Python社區也意識到Python需要一個獨立的標準庫來支持協程,于是就有了后來的asyncio。
小梁把同步的requests庫改成了支持asyncio的aiohttp庫,使用3.5的async/await語法( 3.5之前用@asyncio.coroutine和yield from代替 )寫出了協程版本的例子。
#-*- coding:utf-8 -*-
from lxml import etree
from time import time
import asyncio
import aiohttp
url = 'https://movie.douban.com/top250'
async def fetch_content(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def parse(url):
page = await fetch_content(url)
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
tasks = [fetch_content(url) for url in fetch_list]
pages = await asyncio.gather(*tasks)
for page in pages:
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
def main():
loop = asyncio.get_event_loop()
start = time()
for i in range(5):
loop.run_until_complete(parse(url))
end = time()
print ('Cost {} seconds'.format((end - start) / 5))
loop.close()
1.7秒,也不錯。而且用上了async/await語法使得程序的可讀性提高了不少。
python movie_asyncio.py
Cost 1.713043785095215 seconds
經過一番洗禮后,小梁對異步有了更加深刻的認識。異步方式有很多,這里列出了比較常見的幾種,在實際使用中,應該根據使用場景來挑選最合適的應用方案,影響程序效率的因素有很多,以上不同的異步方式在不同的場景下也會有不一樣的表現,不要抱死在一個大樹上,該用同步的地方用同步,該用異步的地方異步,這樣才能構建出更加靈活的網絡應用。
說到這里你估計也明白了, 清晰優雅的協程可以說實現異步的最優方案之一 。
協程的機制使得我們可以用同步的方式寫出異步運行的代碼。
總所周知,Python因為有GIL(全局解釋鎖)這玩意,不可能有真正的多線程的存在,因此很多情況下都會用multiprocessing實現并發,而且在Python中應用多線程還要注意關鍵地方的同步,不太方便,用協程代替多線程和多進程是一個很好的選擇,因為它吸引人的特性: 主動調用/退出,狀態保存,避免cpu上下文切換 等…
什么是協程?
協程,又稱作 Coroutine 。從字面上來理解,即協同運行的例程,它是比是線程(thread)更細量級的用戶態線程,特點是允許用戶的主動調用和主動退出,掛起當前的例程然后返回值或去執行其他任務,接著返回原來停下的點繼續執行。等下,這是否有點奇怪?我們都知道一般函數都是線性執行的,不可能說執行到一半返回,等會兒又跑到原來的地方繼續執行。但一些熟悉python(or其他動態語言)的童鞋都知道這可以做到,答案是用yield語句。其實這里我們要感謝操作系統(OS)為我們做的工作,因為它具有getcontext和swapcontext這些特性,通過系統調用,我們可以把上下文和狀態保存起來,切換到其他的上下文,這些特性為coroutine的實現提供了底層的基礎。操作系統的Interrupts和Traps機制則為這種實現提供了可能性,因此它看起來可能是下面這樣的:

理解生成器(generator)
學過生成器和迭代器的同學應該都知道python有yield這個關鍵字,yield能把一個函數變成一個generator,與return不同,yield在函數中返回值時會保存函數的狀態,使下一次調用函數時會從上一次的狀態繼續執行,即從yield的下一條語句開始執行,這樣做有許多好處,比如我們想要生成一個數列,若該數列的存儲空間太大,而我們僅僅需要訪問前面幾個元素,那么yield就派上用場了,它實現了這種一邊循環一邊計算的機制,節省了存儲空間,提高了運行效率。
這里以斐波那契數列為例:
def fib(max):
n, a, b = 0, 0, 1
while n max:
print b
a, b = b, a + b
n = n + 1
如果使用上述的算法,那么我每一次調用函數時,都要耗費大量時間循環做重復的事情。而如果使用yield的話,它則會生成一個generator,當我需要時,調用它的next方法獲得下一個值,改動的方法很簡單,直接把print改為yield就OK。
生產者-消費者的協程
#-*- coding:utf-8
def consumer():
status = True
while True:
n = yield status
print("我拿到了{}!".format(n))
if n == 3:
status = False
def producer(consumer):
n = 5
while n > 0:
# yield給主程序返回消費者的狀態
yield consumer.send(n)
n -= 1
if __name__ == '__main__':
c = consumer()
c.send(None)
p = producer(c)
for status in p:
if status == False:
print("我只要3,4,5就行啦")
break
print("程序結束")
上面這個例子是典型的生產者-消費者問題,我們用協程的方式來實現它。首先從主程序中開始看,第一句c = consumer(),因為consumer函數中存在yield語句,python會把它當成一個generator(生成器,注意:生成器和協程的概念區別很大,千萬別混淆了兩者),因此在運行這條語句后,python并不會像執行函數一樣,而是返回了一個generator object。
再看第二條語句c.send(None),這條語句的作用是將consumer(即變量c,它是一個generator)中的語句推進到第一個yield語句出現的位置,那么在例子中,consumer中的status = True和while True:都已經被執行了,程序停留在n = yield status的位置(注意:此時這條語句還沒有被執行),上面說的send(None)語句十分重要,如果漏寫這一句,那么程序直接報錯,這個send()方法看上去似乎挺神奇,等下再講它的作用。
下面第三句p = producer(c),這里則像上面一樣定義了producer的生成器,注意的是這里我們傳入了消費者的生成器,來讓producer跟consumer通信。
第四句for status in p:,這條語句會循環地運行producer和獲取它yield回來的狀態。
好了,進入正題, 現在我們要讓生產者發送1,2,3,4,5給消費者,消費者接受數字,返回狀態給生產者,而我們的消費者只需要3,4,5就行了,當數字等于3時,會返回一個錯誤的狀態。最終我們需要由主程序來監控生產者-消費者的過程狀態,調度結束程序。
現在程序流進入了producer里面,我們直接看yield consumer.send(n),生產者調用了消費者的send()方法,把n發送給consumer(即c),在consumer中的n = yield status,n拿到的是消費者發送的數字,同時,consumer用yield的方式把狀態(status)返回給消費者,注意:這時producer(即消費者)的consumer.send()調用返回的就是consumer中yield的status!消費者馬上將status返回給調度它的主程序,主程序獲取狀態,判斷是否錯誤,若錯誤,則終止循環,結束程序。上面看起來有點繞,其實這里面generator.send(n)的作用是:把n發送generator(生成器)中yield的賦值語句中,同時返回generator中yield的變量(結果)。
于是程序便一直運作,直至consumer中獲取的n的值變為3!此時consumer把status變為False,最后返回到主程序,主程序中斷循環,程序結束。
輸出結果:
我拿到了5!
我拿到了4!
我拿到了3!
我只要3,4,5就行啦
程序結束
Coroutine與Generator
有些人會把生成器(generator)和協程(coroutine)的概念混淆,我以前也會這樣,不過其實發現,兩者的區別還是很大的。
直接上最重要的區別:
-
generator總是生成值,一般是迭代的序列
-
coroutine關注的是消耗值,是數據(data)的消費者
-
coroutine不會與迭代操作關聯,而generator會
-
coroutine強調協同控制程序流,generator強調保存狀態和產生數據
相似的是,它們都是不用return來實現重復調用的函數/對象,都用到了yield(中斷/恢復)的方式來實現。
asyncio
asyncio是python 3.4中新增的模塊,它提供了一種機制,使得你可以用協程(coroutines)、IO復用(multiplexing I/O)在單線程環境中編寫并發模型。
根據官方說明,asyncio模塊主要包括了:
-
具有特定系統實現的事件循環(event loop);
-
數據通訊和協議抽象(類似Twisted中的部分);
-
TCP,UDP,SSL,子進程管道,延遲調用和其他;
-
Future類;
-
yield from的支持;
-
同步的支持;
-
提供向線程池轉移作業的接口;
下面來看下asyncio的一個例子:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

當事件循環開始運行時,它會在Task中尋找coroutine來執行調度,因為事件循環注冊了 print_sum() ,因此 print_sum() 被調用,執行 result = await compute(x, y) 這條語句(等同于 result = yield from compute(x, y) ),因為 compute() 自身就是一個coroutine,因此 print_sum() 這個協程就會暫時被掛起, compute() 被加入到事件循環中,程序流執行 compute() 中的print語句,打印”Compute %s + %s …”,然后執行了 await asyncio.sleep(1.0) ,因為 asyncio.sleep() 也是一個coroutine,接著 compute() 就會被掛起,等待計時器讀秒,在這1秒的過程中,事件循環會在隊列中查詢可以被調度的coroutine,而因為此前 print_sum() 與 compute() 都被掛起了,因此事件循環會停下來等待協程的調度,當計時器讀秒結束后,程序流便會返回到 compute() 中執行return語句,結果會返回到 print_sum() 中的result中,最后打印result,事件隊列中沒有可以調度的任務了,此時 loop.close() 把事件隊列關閉,程序結束。
會JS的同學是不是感覺倍感親切?沒錯, 事件驅動 模型就是異步編程的重中之重。
最后再通過一個例子,演示 事件驅動 模型的運作原理。
首先,我們用同步的方式,抓取baidu的一百個網頁。
def sync_way():
for i in range(100):
sock = socket.socket()
sock.connect(('www.baidu.com', 80))
print('connected')
request = 'GET {} HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.format('/s?wd={}'.format(i))
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
print('done!!')
from time import time
start = time()
sync_way() #Cost 47.757508993148804 seconds
end = time()
print ('Cost {} seconds'.format(end - start))
總共耗時47秒,這對于一個要求性能的爬蟲來說是不可接受的,看看我們有沒有辦法將這個爬蟲的性能提高十倍以上,把時間縮短到5秒之內。
首先考慮上面這個程序的瓶頸出在哪個地方,經過思考,很容易看出上面的程序有幾個不足之處:
-
socket連接的建立需要等待,一旦握手建立的時間漫長,就會影響下面的流程正常運行。
-
socket接收數據的過程是阻塞式的,等待buffer的過程也是需要一段時間的。
-
socket的建立連接-接收過程都是一個一個來的,在沒完成一個連接時不能進行其他連接的處理。
好了,先解決第一個問題:socket的等待。痛點很明顯,我們不能一直等待socket的狀態發生改變,而是當socket的狀態發生改變時,讓它告訴我們。要解決這個問題,可以利用io復用,先看看io復用的定義:
IO復用:預先告知內核,使內核一旦發現進程指定的一個或多個IO條件就緒(輸入準備被讀取,或描述符能承接更多的輸出),它就通知進程。
阻塞IO模型看起來是這樣的:
recvfrom->無數據報準備好->等待數據->數據報準備好->數據從內核復制到用戶空間->復制完成->返回成功指示
而IO復用模型看起來是這樣的:
select->無數據報準備好->據報準備好->返回可讀條件->recvfrom->數據從內核復制到用戶空間->復制完成->返回成功指示
于是我們可以對上面的代碼這樣修改。
from selectors import DefaultSelector, EVENT_WRITE
selector = DefaultSelector()
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('www.baidu.com', 80))
except BlockingIOError:
pass
def connected():
selector.unregister(sock.fileno())
print('connected!')
selector.register(sock.fileno(), EVENT_WRITE, connected)
把socket設置為非阻塞,把socket的句柄注冊到事件輪詢中,當socket發生可寫事件時,表示socket連接就緒了,這時候再把socket從事件輪詢中刪除,在socket返回可寫事件之前,系統都不是阻塞狀態的。同理,對于socket從網絡中接收數據,也可以用同樣的方法,只需要把要監聽的事件改為可讀事件就行了。
當然,僅僅這樣還是不夠的,試想一下,如果有多個socket進行連接,采用上面的非阻塞方式,當一個socket開始等待事件返回時,理論上系統此時應該做的是處理另一個socket的流程,但這里還缺乏了一個必要的機制,當從一個處理socket流程切到另一個處理socket流程時,原來的流程的上下文狀態該怎么保存下來以便恢復呢,顯然易見這里需要用到上面說到的協程機制,在python中通過yield語法可以把一個函數或方法包裝成一個生成器,當生成器執行yield語句時,生成器內部的上下文狀態就會被保存,如果想要在未來的操作中把這個生成器恢復,只需要調用生成器的send方法即可從原流程中繼續往下走。
有了上面這個概念,我們可以創建一個Future類,它代表了協程中等待的“未來發生的結果”,舉例來說,在發起網絡請求時,socket會在buffer中返回一些數據,這個獲取的動作在異步流程中發生的時間是不確定的,Future就是用來封裝這個未來結果的類,但當socket在某個時間段監測到可讀事件,讀取到數據了,那么他就會把數據寫入Future里,并告知Future要執行某些回調動作。
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for callback in self._callbacks:
callback(self)
有了Future,我們可以包裝一個AsyncRequest類,用以發起異步請求的操作。
class AsyncRequest:
def __init__(self, host, url, port, timeout=5):
self.sock = socket.socket()
self.sock.settimeout(timeout)
self.sock.setblocking(False)
self.host = host
self.url = url
self.port = port
self.method = None
def get(self):
self.method = 'GET'
self.request = '{} {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(self.method, self.url, self.host)
return self
def process(self):
if self.method is None:
self.get()
try:
self.sock.connect((self.host, self.port))
except BlockingIOError:
pass
self.f = Future()
selector.register(self.sock.fileno(),
EVENT_WRITE,
self.on_connected)
yield self.f
selector.unregister(self.sock.fileno())
self.sock.send(self.request.encode('ascii'))
chunk = yield from read_all(self.sock)
return chunk
def on_connected(self, key, mask):
self.f.set_result(None)
在AsyncRequest的process方法里,實例在發起異步連接請求后通過yield一個future阻斷了程序流,表示他需要等待未來發生的動作發生(在這里是等待socket可寫),這時候系統會去執行其他事件,當未來socket變成可寫時,future被寫入數據,同時執行回調,從原來停下的地方開始執行,執行讀取socket數據的處理。
這里關鍵的地方就是future在yield之后會在未來某個時候再次被send然后繼續往下走,這時候就需要一個用來驅動Future的類。這里稱為Task,它需要接受一個協程作為參數,并驅動協程的程序流執行。
class Task(Future):
def __init__(self, coro):
super().__init__()
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
next_future = self.coro.send(future.result)
if next_future is None:
return
except StopIteration as exc:
self.set_result(exc.value)
return
next_future.add_done_callback(self.step)
最終,整個程序還需要一個EventLoop類,用來監聽到來的事件為socket執行回調以及把協程包裝成Task來實現異步驅動。
class EventLoop:
stopped = False
select_timeout = 5
def run_until_complete(self, coros):
tasks = [Task(coro) for coro in coros]
try:
self.run_forever()
except StopError:
pass
def run_forever(self):
while not self.stopped:
events = selector.select(self.select_timeout)
if not events:
raise SelectTimeout('輪詢超時')
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
def close(self):
self.stopped = True
OK,那么現在用新的方法再測試一遍,通過python3的yield from語法我們把協程操作代理到AsyncRequest類的process方法中,最終把協程放到EventLoop中執行。
def fetch(url):
request = AsyncRequest('www.baidu.com', url, 80)
data = yield from request.process()
return data
def get_page(url):
page = yield from fetch(url)
return page
def async_way():
ev_loop = get_event_loop()
ev_loop.run_until_complete([
get_page('/s?wd={}'.format(i)) for i in range(100)
])
from time import time
start = time()
async_way() # Cost 3.534296989440918 seconds
end = time()
print ('Cost {} seconds'.format(end - start))
可以看到總共耗時3.5秒,通過把同步改寫成基于事件驅動的異步,整個程序的效率提高的十倍以上。
有了上面的基礎,可以更進一步改寫出一個的任務隊列的異步處理形式,把EventLoop的實現隱藏,提供更簡單的接口。
from collections import deque
class Queue:
def __init__(self):
self._q = deque()
self.size = 0
def put(self, item):
self.size += 1
self._q.append(item)
def get(self):
item = self._q.popleft()
return item
def task_done(self):
self.size -= 1
if self.size == 0:
self.empty_callback()
class AsyncWorker(Queue):
def __init__(self, coroutine, workers=10, loop_timeout=5):
super().__init__()
self.func = coroutine
self.stopped = False
self.ev_loop = get_event_loop()
self.ev_loop.select_timeout = loop_timeout
self.workers = workers
self.result_callbacks = []
def work(self):
def _work():
while not self.stopped:
item = None
try:
item = self.get()
except IndexError:
yield None
result = yield from self.func(item)
self.task_done()
for callback in self.result_callbacks:
callback(result)
self.tasks = []
for _ in range(self.workers):
self.tasks.append(_work())
self.ev_loop.run_until_complete(self.tasks)
def add_result_callback(self, func):
self.result_callbacks.append(func)
def empty_callback(self):
self.ev_loop.close()
def print_content_length(data):
print(len(data))
async_worker = AsyncWorker(get_page, workers=20)
async_worker.add_result_callback(print_content_length)
for i in range(15):
async_worker.put('/s?wd={}'.format(i))
async_worker.work()
參考文獻:
A Web Crawler With asyncio Coroutines
來自:https://zhuanlan.zhihu.com/p/25228075