Python 異步網絡爬蟲 I
本文主要討論下面幾個問題:
-
什么是異步(Asynchronous)編程?
-
為什么要使用異步編程?
-
在 Python 中有哪些實現異步編程的方法?
-
Python 3.5 如何使用 async/await 實現異步網絡爬蟲?
所謂 異步 是相對于 同步(Synchronous) 的概念來說的,之所以容易造成混亂,是因為剛開始接觸這兩個概念時容易把 同步 看做是 同時 ,而 同時 不是意味著 并行(Parallel) 嗎?然而實際上同步或者異步是針對于 時間軸 的概念,同步意味著 順序、統一的時間軸 ,而異步則意味著 亂序、效率優先的時間軸 。比如在爬蟲運行時,先抓取 A 頁面,然后從中提取下一層頁面 B 的鏈接,此時的爬蟲程序的運行只能是同步的,B 頁面只能等到 A 頁面處理完成之后才能抓取;然而對于獨立的兩個頁面 A1 和 A2,在處理 A1 網絡請求的時間里,與其讓 CPU 空閑而 A2 等在后面,不如先處理 A2,等到誰先完成網絡請求誰就先來進行處理,這樣可以更加充分地利用 CPU,但是 A1 和 A2 的執行順序則是不確定的,也就是異步的。
很顯然,在某些情況下采用異步編程可以提高程序運行效率,減少不必要的等待時間,而之所以能夠做到這一點,是因為計算機的 CPU 與其它設備是獨立運作的,同時 CPU 的運行效率遠高于其他設備的讀寫(I/O)效率。為了利用異步編程的優勢,人們想出了很多方法來重新安排、調度(Schedule)程序的運行順序,從而最大化 CPU 的使用率,其中包括進程、線程、協程等(具體可參考《 Python 中的進程、線程、協程、同步、異步、回調 》)。在 Python 3.5 以前通過 @types.coroutine 作為修飾器的方式將一個生成器(Generator)轉化為一個協程,而在 Python 3.5 中則通過關鍵詞 async/await 來定義一個協程,同時也將 asyncio 納入為標準庫,用于實現基于協程的異步編程。
要使用 asyncio 需要理解下面幾個概念:
-
Event loop
-
Coroutine
-
Future & Task
Event loop
了解 JavaScript 或 Node.js 肯定對事件循環不陌生,我們可以把它看作是一種循環式(loop)的調度機制,它可以安排需要 CPU 執行的操作優先執行,而會被 I/O 阻塞的行為則進入等待隊列:
asyncio 自帶了事件循環:
import asyncio
loop = anscio.get_event_loop()
# loop.run_until_complete(coro())
loop.close()
當然你也可以選擇其它的實現形式,例如 Sanic 框架采用的 uvloop ,用起來也非常簡單( 至于性能上是否更優我沒有驗證過,但至少在 Jupyter Notebook 上 uvloop 用起來更方便):
import asyncio
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
Coroutine
Python 3.5 以后推薦使用 async/await 關鍵詞來定義協程,它具有如下特性:
- 通過 await 將可能阻塞的行為掛起,直到有結果之后繼續執行,Event loop 也是據此來對多個協程的執行進行調度的;
- 協程并不像一般的函數一樣,通過 coro() 進行調用并不會執行它,而只有將它放入 Event loop 進行調度才能執行。
一個簡單的例子:
import uvloop
import asyncio
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
async def compute(a, b):
print("Computing {} + {}...".format(a, b))
await asyncio.sleep(a+b)
return a + b
tasks = []
for i, j in zip(range(3), range(3)):
print(i, j)
tasks.append(compute(i, j))
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
### OUTPUT
"""
0 0
1 1
2 2
Computing 0 + 0...
Computing 1 + 1...
Computing 2 + 2...
CPU times: user 1.05 ms, sys: 1.21 ms, total: 2.26 ms
Wall time: 4 s
"""
由于我們沒辦法知道協程將在什么時候調用及返回, asyncio 中提供了 Future 這一對象來追蹤它的執行結果。
Future & Task
Future 相當于 JavaScript 中的 Promise ,用于保存 未來 可能返回的結果。而 Task 則是 Future 的子類,與 Future 不同的是它包含了一個將要執行的協程( 從而組成一個需要被調度的任務)。還以上面的程序為例,如果想要知道計算結果,可以通過 asyncio.ensure_future() 方法將協程包裹成 Task ,最后再來讀取結果:
import uvloop
import asyncio
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
async def compute(a, b):
print("Computing {} + {}...".format(a, b))
await asyncio.sleep(a+b)
return a + b
tasks = []
for i, j in zip(range(3), range(3)):
print(i, j)
tasks.append(asyncio.ensure_future(compute(i, j)))
loop.run_until_complete(asyncio.gather(*tasks))
for t in tasks:
print(t.result())
loop.close()
### OUTPUT
"""
0 0
1 1
2 2
Computing 0 + 0...
Computing 1 + 1...
Computing 2 + 2...
0
2
4
CPU times: user 1.62 ms, sys: 1.86 ms, total: 3.49 ms
Wall time: 4.01 s
"""
異步網絡請求
Python 處理網絡請求最好用的庫就是 requests (應該沒有之一),但由于它的請求過程是同步阻塞的,因此只好選用 aiohttp 。為了對比同步與異步情況下的差異,先偽造一個假的異步處理服務器:
from sanic import Sanic
from sanic.response import text
import asyncio
app = Sanic(__name__)
@app.route("/<word>")
@app.route("/")
async def index(req, word=""):
t = len(word) / 10
await asyncio.sleep(t)
return text("It costs {}s to process `{}`!".format(t, word))
app.run()
服務器處理耗時與請求參數( word )長度成正比,采用同步請求方式,運行結果如下:
import requests as req
URL = "http://127.0.0.1:8000/{}"
words = ["Hello", "Python", "Fans", "!"]
for word in words:
resp = req.get(URL.format(word))
print(resp.text)
### OUTPUT
"""
It costs 0.5s to process `Hello`!
It costs 0.6s to process `Python`!
It costs 0.4s to process `Fans`!
It costs 0.1s to process `!`!
CPU times: user 18.5 ms, sys: 2.98 ms, total: 21.4 ms
Wall time: 1.64 s
"""
采用異步請求,運行結果如下:
import asyncio
import aiohttp
import uvloop
URL = "http://127.0.0.1:8000/{}"
words = ["Hello", "Python", "Fans", "!"]
async def getPage(session, word):
with aiohttp.Timeout(10):
async with session.get(URL.format(word)) as resp:
print(await resp.text())
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
session = aiohttp.ClientSession(loop=loop)
tasks = []
for word in words:
tasks.append(getPage(session, word))
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
session.close()
### OUTPUT
"""
It costs 0.1s to process `!`!
It costs 0.4s to process `Fans`!
It costs 0.5s to process `Hello`!
It costs 0.6s to process `Python`!
CPU times: user 61.2 ms, sys: 18.2 ms, total: 79.3 ms
Wall time: 732 ms
"""
從運行時間上來看效果是很明顯的。
參考
來自:http://blog.rainy.im/2016/10/30/python-async-webscraper-i/